[jboss-svn-commits] JBL Code SVN: r24227 - in labs/jbosstm/trunk/XTS: WS-T/dev/src10/com/arjuna/wst/stub and 5 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Dec 3 10:23:34 EST 2008


Author: adinn
Date: 2008-12-03 10:23:34 -0500 (Wed, 03 Dec 2008)
New Revision: 24227

Added:
   labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java
   labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java
Modified:
   labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java
   labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java
   labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java
   labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java
Log:
implementation of AT 1.0 and 1.1 subtransaction recovery which stills needs proper testing but does not break the WSTX unit tests

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java	2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -133,12 +133,26 @@
     public abstract boolean isCoordinatorRecoveryStarted();
 
     /**
+     * test whether the first AT subordinate coordinator recovery scan has completed. this indicates
+     * whether there may or may not still be unknown AT subtransaction records on disk. If the first
+     * scan has not yet completed then a commit for an unknown subtransaction must raise an exception
+     * delaying commit of the parent transaction.
+     */
+    public abstract boolean isSubordinateCoordinatorRecoveryStarted();
+
+    /**
      * record the fact thatwhether the first AT coordinator recovery scan has completed.
      */
 
     public abstract void setCoordinatorRecoveryStarted();
 
     /**
+     * record the fact thatwhether the first AT coordinator recovery scan has completed.
+     */
+
+    public abstract void setSubordinateCoordinatorRecoveryStarted();
+
+    /**
      * the singleton instance of the recovery manager
      */
 

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java	2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -3,19 +3,44 @@
 import com.arjuna.wst.*;
 import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
 import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.OutputObjectState;
+import com.arjuna.ats.arjuna.state.InputObjectState;
 
+import java.io.IOException;
+
+import org.jboss.jbossts.xts.recovery.participant.at.XTSATRecoveryManager;
+
 /**
  * A durable participant registered on behalf of an interposed WS-AT coordinator in order to ensure that
  * durable participants in the subtransaction prepared, committed and aborted at the right time.
  */
 public class SubordinateDurable2PCStub implements Durable2PCParticipant
 {
+    /**
+     * normal constructor used when the subordinate coordinator is registered as a durable participant
+     * with its parent coordinator.
+     *
+     * @param coordinator
+     */
     public SubordinateDurable2PCStub(SubordinateCoordinator coordinator)
     {
         this.coordinator = coordinator;
+        this.coordinatorId = coordinator.get_uid().stringForm();
+        this.recovered = false;
     }
 
     /**
+     * empty constructor for use only during recovery
+     */
+    public SubordinateDurable2PCStub()
+    {
+        this.coordinator = null;
+        this.coordinatorId = null;
+        this.recovered = true;
+    }
+
+    /**
      * This will be called when the parent coordinator is preparing its durable participants and should ensure
      * that the interposed cooordinator does the same.
      *
@@ -44,7 +69,41 @@
      */
 
     public void commit() throws WrongStateException, SystemException {
-        coordinator.commit();
+        if (!isRecovered()) {
+            coordinator.commit();
+        } else {
+            // first check whether crashed coordinators have been recovered
+            XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+            boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+            // now look for a subordinate coordinator with the right id
+            coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+            if (coordinator == null) {
+                if (!isRecoveryScanStarted) {
+                    // the subtransaction may still be waiting to be resolved
+                    // throw an exception causing the commit to be retried later
+                    throw new SystemException();
+                }
+            } else if(!coordinator.isActivated()) {
+                // the transaction was logged but has not yet been recovered successfully
+                // throw an exception causing the commit to be retried later
+                    throw new SystemException();
+            } else {
+                int status = coordinator.status();
+
+                if (status == ActionStatus.PREPARED) {
+                    // ok, the commit process was not previously initiated so start it now
+                    coordinator.commit();
+                    status = coordinator.status();
+                }
+
+                // check that we are not still committing because of a comms timeout
+
+                if (status == ActionStatus.COMMITTING) {
+                    // throw an exception causing the commit to be retried later
+                    throw new SystemException();
+                }
+            }
+        }
     }
 
     /**
@@ -55,7 +114,34 @@
      */
 
     public void rollback() throws WrongStateException, SystemException {
-        coordinator.rollback();
+        if (!isRecovered()) {
+            coordinator.rollback();
+        } else {
+            // first check whether crashed coordinators have been recovered
+            XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+            boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+            // now look for a subordinate coordinator with the right id
+            coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+            if (coordinator == null) {
+                if (!isRecoveryScanStarted) {
+                    // the subtransaction may still be waiting to be resolved
+                    // throw an exception causing the rollback to be retried later
+                    throw new SystemException();
+                }
+            } else if(!coordinator.isActivated()) {
+                // the transaction was logged but has not yet been recovered successfully
+                // throw an exception causing the rollback to be retried later
+                    throw new SystemException();
+            } else {
+                int status = coordinator.status();
+
+                if (status == ActionStatus.PREPARED) {
+                    // ok, the rollback process was not previously initiated so start it now
+                    coordinator.rollback();
+                    status = coordinator.status();
+                }
+            }
+        }
     }
 
     /**
@@ -75,7 +161,58 @@
     }
 
     /**
+     * Save the state of the particpant to the specified input object stream.
+     * @param oos The output output stream.
+     * @return true if persisted, false otherwise.
+     */
+    public boolean saveState(OutputObjectState oos) {
+        // we need to save the id of the subordinate coordinator so we can identify it again
+        // when we are recreated
+        try {
+            oos.packString(coordinatorId);
+            return true;
+        } catch (IOException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Restore the state of the particpant from the specified input object stream.
+     * @param ios The Input object stream.
+     * @return true if restored, false otherwise.
+     */
+    public boolean restoreState(InputObjectState ios) {
+        // restore the subordinate coordinator id so we can check to ensure it has been committed
+        String coordinatorId;
+        try {
+            coordinatorId = ios.unpackString();
+            return true;
+        } catch (IOException e) {
+            return false;
+        }
+    }
+
+    /**
+     * test if this participant is recovered
+     */
+    public boolean isRecovered()
+    {
+        return recovered;
+    }
+
+    /**
      * the interposed coordinator
      */
     private SubordinateCoordinator coordinator;
+
+    /**
+     * the interposed coordinator's id
+     */
+    private String coordinatorId;
+
+    /**
+     * a flag indicating whether this participant has been recovered
+     */
+
+    private boolean recovered;
 }
\ No newline at end of file

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java	2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -3,19 +3,44 @@
 import com.arjuna.wst.*;
 import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
 import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.OutputObjectState;
+import com.arjuna.ats.arjuna.state.InputObjectState;
 
+import java.io.IOException;
+
+import org.jboss.jbossts.xts.recovery.participant.at.XTSATRecoveryManager;
+
 /**
  * A durable participant registered on behalf of an interposed WS-AT coordinator in order to ensure that
  * durable participants in the subtransaction prepared, committed and aborted at the right time.
  */
-public class SubordinateDurable2PCStub implements Durable2PCParticipant
+public class SubordinateDurable2PCStub implements Durable2PCParticipant, PersistableParticipant
 {
+    /**
+     * normal constructor used when the subordinate coordinator is registered as a durable participant
+     * with its parent coordinator.
+     *
+     * @param coordinator
+     */
     public SubordinateDurable2PCStub(SubordinateCoordinator coordinator)
     {
         this.coordinator = coordinator;
+        this.coordinatorId = coordinator.get_uid().stringForm();
+        this.recovered = false;
     }
 
     /**
+     * empty constructor for use only during recovery
+     */
+    public SubordinateDurable2PCStub()
+    {
+        this.coordinator = null;
+        this.coordinatorId = null;
+        this.recovered = true;
+    }
+
+    /**
      * This will be called when the parent coordinator is preparing its durable participants and should ensure
      * that the interposed cooordinator does the same.
      *
@@ -44,7 +69,41 @@
      */
 
     public void commit() throws WrongStateException, SystemException {
-        coordinator.commit();
+        if (!isRecovered()) {
+            coordinator.commit();
+        } else {
+            // first check whether crashed coordinators have been recovered
+            XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+            boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+            // now look for a subordinate coordinator with the right id
+            coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+            if (coordinator == null) {
+                if (!isRecoveryScanStarted) {
+                    // the subtransaction may still be waiting to be resolved
+                    // throw an exception causing the commit to be retried later
+                    throw new SystemException();
+                }
+            } else if(!coordinator.isActivated()) {
+                // the transaction was logged but has not yet been recovered successfully
+                // throw an exception causing the commit to be retried later
+                    throw new SystemException();
+            } else {
+                int status = coordinator.status();
+
+                if (status == ActionStatus.PREPARED) {
+                    // ok, the commit process was not previously initiated so start it now
+                    coordinator.commit();
+                    status = coordinator.status();
+                }
+
+                // check that we are not still committing because of a comms timeout
+
+                if (status == ActionStatus.COMMITTING) {
+                    // throw an exception causing the commit to be retried later
+                    throw new SystemException();
+                }
+            }
+        }
     }
 
     /**
@@ -55,7 +114,34 @@
      */
 
     public void rollback() throws WrongStateException, SystemException {
-        coordinator.rollback();
+        if (!isRecovered()) {
+            coordinator.rollback();
+        } else {
+            // first check whether crashed coordinators have been recovered
+            XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+            boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+            // now look for a subordinate coordinator with the right id
+            coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+            if (coordinator == null) {
+                if (!isRecoveryScanStarted) {
+                    // the subtransaction may still be waiting to be resolved
+                    // throw an exception causing the rollback to be retried later
+                    throw new SystemException();
+                }
+            } else if(!coordinator.isActivated()) {
+                // the transaction was logged but has not yet been recovered successfully
+                // throw an exception causing the rollback to be retried later
+                    throw new SystemException();
+            } else {
+                int status = coordinator.status();
+
+                if (status == ActionStatus.PREPARED) {
+                    // ok, the rollback process was not previously initiated so start it now
+                    coordinator.rollback();
+                    status = coordinator.status();
+                }
+            }
+        }
     }
 
     /**
@@ -75,7 +161,58 @@
     }
 
     /**
+     * Save the state of the particpant to the specified input object stream.
+     * @param oos The output output stream.
+     * @return true if persisted, false otherwise.
+     */
+    public boolean saveState(OutputObjectState oos) {
+        // we need to save the id of the subordinate coordinator so we can identify it again
+        // when we are recreated
+        try {
+            oos.packString(coordinatorId);
+            return true;
+        } catch (IOException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Restore the state of the particpant from the specified input object stream.
+     * @param ios The Input object stream.
+     * @return true if restored, false otherwise.
+     */
+    public boolean restoreState(InputObjectState ios) {
+        // restore the subordinate coordinator id so we can check to ensure it has been committed
+        String coordinatorId;
+        try {
+            coordinatorId = ios.unpackString();
+            return true;
+        } catch (IOException e) {
+            return false;
+        }
+    }
+
+    /**
+     * test if this participant is recovered
+     */
+    public boolean isRecovered()
+    {
+        return recovered;
+    }
+
+    /**
      * the interposed coordinator
      */
     private SubordinateCoordinator coordinator;
+
+    /**
+     * the interposed coordinator's id
+     */
+    private String coordinatorId;
+
+    /**
+     * a flag indicating whether this participant has been recovered
+     */
+
+    private boolean recovered;
 }
\ No newline at end of file

Modified: labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java
===================================================================
--- labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java	2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -44,6 +44,8 @@
 
 import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.ACCoordinator;
 
+import java.util.HashMap;
+
 /**
  * This class represents a specific coordination instance. It is essentially an
  * ArjunaCore TwoPhaseCoordinator, which gives us access to two-phase with
@@ -58,15 +60,24 @@
 
 public class SubordinateCoordinator extends ACCoordinator
 {
-	
+
+    /**
+     * normal constructor
+     */
 	public SubordinateCoordinator ()
 	{
 		super();
+        activated = true;
 	}
 
+    /**
+     * constructor for recovered coordinator
+     * @param recovery
+     */
 	public SubordinateCoordinator (Uid recovery)
 	{
 		super(recovery);
+        activated = false;
 	}
 
 	/**
@@ -233,6 +244,11 @@
     {
     }
 
+    public String type ()
+    {
+        return "/StateManager/BasicAction/AtomicAction/TwoPhaseCoordinator/TwoPhase/SubordinateCoordinator";
+    }
+
     /**
      * return a uid for the volatile participant registered on behalf of this corodinator
      */
@@ -250,9 +266,42 @@
     }
 
 
+    protected static synchronized void addRecoveredCoordinator(SubordinateCoordinator coordinator)
+    {
+        recoveredCoordinators.put(coordinator.get_uid().stringForm(), coordinator);
+    }
+
+    protected static synchronized void removeRecoveredCoordinator(SubordinateCoordinator coordinator)
+    {
+        recoveredCoordinators.put(coordinator.get_uid().stringForm(), null);
+    }
+
+    protected void setActivated()
+    {
+        activated = true;
+    }
+
+    public boolean isActivated()
+    {
+        return activated;
+    }
+
+    public static synchronized SubordinateCoordinator getRecoveredCoordinator(String coordinatorId)
+    {
+        return recoveredCoordinators.get(coordinatorId);
+    }
+
     /**
      * this saves the status after the subtransaction commit or rollback so it can be referred to during
      * afterCompletion processing.
      */
     private int finalStatus = ActionStatus.CREATED;
+
+    /**
+     * flag identifying whether this coordinator is active, set true for normal transactions and false
+     * for recovered transactions until they are activated
+     */
+    private boolean activated;
+
+    private static final HashMap<String, SubordinateCoordinator> recoveredCoordinators = new HashMap<String, SubordinateCoordinator>();
 }

Modified: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java	2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -22,6 +22,7 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.jbossts.xts.recovery.coordinator.at.ACCoordinatorRecoveryModule;
+import org.jboss.jbossts.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule;
 import org.jboss.jbossts.xts.recovery.coordinator.ba.BACoordinatorRecoveryModule;
 import org.jboss.jbossts.xts.recovery.participant.at.ATParticipantRecoveryModule;
 import org.jboss.jbossts.xts.recovery.participant.ba.BAParticipantRecoveryModule;
@@ -107,6 +108,7 @@
     private final Logger log = org.jboss.logging.Logger.getLogger(XTSService.class);
 
     private ACCoordinatorRecoveryModule acCoordinatorRecoveryModule = null;
+    private SubordinateCoordinatorRecoveryModule subordinateCoordinatorRecoveryModule = null;
     private ATParticipantRecoveryModule atParticipantRecoveryModule = null;
 
     private BACoordinatorRecoveryModule baCoordinatorRecoveryModule = null;
@@ -225,6 +227,9 @@
 
         TaskManagerInitialisation(); // com.arjuna.services.framework.admin.TaskManagerInitialisation : initialise the Task Manager
 
+
+        // install the AT recovery modules
+        
         acCoordinatorRecoveryModule = new ACCoordinatorRecoveryModule();
 
         // ensure Implementations are installed into the inventory before we register the module
@@ -232,6 +237,13 @@
         acCoordinatorRecoveryModule.install();
 
         // we don't need to install anything in the Inventory for this recovery module as it
+        // uses the same records as those employed by ACCoordinatorRecoveryModule
+
+        subordinateCoordinatorRecoveryModule = new SubordinateCoordinatorRecoveryModule();
+
+        subordinateCoordinatorRecoveryModule.install();
+
+        // we don't need to install anything in the Inventory for this recovery module as it
         // manages its own ObjectStore records but we do need it to create the recovery manager
         // singleton.
 
@@ -267,6 +279,7 @@
         RecoveryManager.manager().addModule(baParticipantRecoveryModule);
 
         RecoveryManager.manager().addModule(acCoordinatorRecoveryModule);
+        RecoveryManager.manager().addModule(subordinateCoordinatorRecoveryModule);
         RecoveryManager.manager().addModule(baCoordinatorRecoveryModule);
     }
 
@@ -280,6 +293,12 @@
             // ok, now it is safe to get the recovery manager to uninstall its Implementations from the inventory
             baCoordinatorRecoveryModule.uninstall();
         }
+        if (subordinateCoordinatorRecoveryModule != null) {
+            // remove the module, making sure any scan which might be using it has completed
+            RecoveryManager.manager().removeModule(subordinateCoordinatorRecoveryModule, true);
+            // ok, now it is safe to get the recovery manager to uninstall its Implementations from the inventory
+            subordinateCoordinatorRecoveryModule.uninstall();
+        }
         if (acCoordinatorRecoveryModule != null) {
             // remove the module, making sure any scan which might be using it has completed
             RecoveryManager.manager().removeModule(acCoordinatorRecoveryModule, true);

Copied: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java (from rev 23835, labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverACCoordinator.java)
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java	                        (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -0,0 +1,123 @@
+package org.jboss.jbossts.xts.recovery.coordinator.at;
+
+import org.jboss.jbossts.xts.logging.XTSLogger;
+
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
+import com.arjuna.common.util.logging.DebugLevel;
+import com.arjuna.common.util.logging.VisibilityLevel;
+
+/**
+ * This class is a plug-in module for the recovery manager.
+ * It is responsible for recovering failed WSAT ACCoordinator transactions.
+ *
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_1 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_1] - RecoverSubordinateCoordinator.replayPhase2 recovering {0} ActionStatus is {1}
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_2 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_2] - RecoverSubordinateCoordinator.replayPhase2: Unexpected status: {0}
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_3 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_3] - RecoverSubordinateCoordinator.replayPhase2( {0} )  finished
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_4 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_4] - RecoverSubordinateCoordinator.replayPhase2 transaction {0} not activated, unable to replay phase 2 commit
+*/
+public class RecoverSubordinateCoordinator extends SubordinateCoordinator {
+
+   /**
+    * Re-creates/activates an AtomicAction for the specified
+    * transaction Uid.
+    */
+   public RecoverSubordinateCoordinator( Uid rcvUid )
+   {
+      super( rcvUid ) ;
+      _activated = activate() ;
+   }
+
+    /**
+     * run parent activate and also make this coordinator visible if there might be a durable participant waiting
+     * for it to commit.
+     * @return
+     */
+    public boolean activate()
+    {
+        boolean result = super.activate();
+        // record whether the activation worked
+        if (result) {
+            setActivated();
+        }
+
+        int status = status();
+        if (result == false || (status == ActionStatus.PREPARED || status == ActionStatus.COMMITTING)) {
+            // we need to install this coordinator in a global table so that the participant which
+            // was driving it will know that it has been recovered but not yet committed
+            // n.b. we do this even if the activation failed because we need to ensure the
+            // participant rejects a commit until this transaction has committed
+            
+            SubordinateCoordinator.addRecoveredCoordinator(this);
+        }
+
+        return result;
+    }
+
+   /**
+    * Replays phase 2 of the commit protocol.
+    */
+   public void replayPhase2()
+   {
+       final int status = status();
+
+       if (XTSLogger.arjLoggerI18N.debugAllowed())
+       {
+	   XTSLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+					FacilityCode.FAC_CRASH_RECOVERY,
+					"org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_1",
+					new Object[]{get_uid(), ActionStatus.stringForm(status)});
+       }
+
+       if ( _activated )
+       {
+           // we don't run phase 2 again if status is PREPARED because we need to wait for the
+           // parent coordinator to tell us what to do
+
+           // we automatically rerun phase2 if the action status is COMMITTING, which happens when
+           // we get a comms timeout from one of the participants after sending it a COMMIT message.
+
+       if ((status == ActionStatus.COMMITTING) ||
+               (status == ActionStatus.COMMITTED) ||
+               (status == ActionStatus.H_COMMIT) ||
+               (status == ActionStatus.H_MIXED) ||
+               (status == ActionStatus.H_HAZARD))
+	   {
+	       super.phase2Commit( _reportHeuristics ) ;
+	   } else if ((status ==  ActionStatus.ABORTED) ||
+               (status == ActionStatus.H_ROLLBACK) ||
+               (status == ActionStatus.ABORTING) ||
+               (status == ActionStatus.ABORT_ONLY))
+       {
+           super.phase2Abort( _reportHeuristics ) ;
+       }
+
+       if (XTSLogger.arjLoggerI18N.debugAllowed())
+	   {
+	       XTSLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+					    FacilityCode.FAC_CRASH_RECOVERY,
+					    "org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_3",
+					    new Object[]{get_uid()});
+	   }
+       }
+       else
+       {
+	   XTSLogger.arjLoggerI18N.warn("org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_4", new Object[]{get_uid()});
+       }
+
+       if ((status == ActionStatus.PREPARED) ||
+               (status == ActionStatus.COMMITTING)) {
+           SubordinateCoordinator.removeRecoveredCoordinator(this);
+       }
+   }
+
+   // Flag to indicate that this transaction has been re-activated
+   // successfully.
+   private boolean _activated = false ;
+
+   // whether heuristic reporting on phase 2 commit is enabled.
+   private boolean _reportHeuristics = true ;
+}
\ No newline at end of file


Property changes on: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Copied: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java (from rev 23835, labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/ACCoordinatorRecoveryModule.java)
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java	                        (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -0,0 +1,342 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2007,
+ * @author Red Hat Middleware LLC.
+ */
+package org.jboss.jbossts.xts.recovery.coordinator.at;
+
+import org.jboss.jbossts.xts.logging.XTSLogger;
+import org.jboss.jbossts.xts.recovery.participant.at.XTSATRecoveryManager;
+
+import com.arjuna.ats.arjuna.recovery.RecoveryModule;
+import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.InputObjectState;
+import com.arjuna.ats.arjuna.exceptions.ObjectStoreException;
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.objectstore.ObjectStore;
+import com.arjuna.common.util.logging.DebugLevel;
+import com.arjuna.common.util.logging.VisibilityLevel;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
+
+import java.util.Vector;
+import java.util.Enumeration;
+
+/**
+ * This class is a plug-in module for the recovery manager.
+ * It is responsible for recovering failed XTS AT subordinate (SubordinateCoordinator) transactions.
+ * (instances of com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator)
+ *
+ * $Id$
+ *
+ * @message org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_1 [org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_1] - RecoveryManagerStatusModule: Object store exception: {0}
+ * @message org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_2 [org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_2] - failed to recover Transaction {0} {1}
+ * @message org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_3 [org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_3] - failed to access transaction store {0} {1}
+ */
+public class SubordinateCoordinatorRecoveryModule implements RecoveryModule
+{
+    public SubordinateCoordinatorRecoveryModule()
+    {
+        if (XTSLogger.arjLogger.isDebugEnabled())
+        {
+            XTSLogger.arjLogger.debug
+                    ( DebugLevel.CONSTRUCTORS,
+                            VisibilityLevel.VIS_PUBLIC,
+                            FacilityCode.FAC_CRASH_RECOVERY,
+                            "SubordinateCoordinatorRecoveryModule created - default" );
+        }
+
+        if (_transactionStore == null)
+        {
+            _transactionStore = TxControl.getStore() ;
+        }
+
+        _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;
+    }
+
+    /**
+     * called by the service startup code before the recovery module is added to the recovery managers
+     * module list
+     */
+    public void install()
+    {
+        // nothing to do here as we share the implementations used by the ACCoordinatorRecoveryModule
+    }
+
+    /**
+     * module list in order to allow the implementations list to be purged of this module's implementations
+     */
+    public void uninstall()
+    {
+        // nothing to do here as we share the implementations used by the ACCoordinatorRecoveryModule
+    }
+
+    /**
+     * This is called periodically by the RecoveryManager
+     */
+    public void periodicWorkFirstPass()
+    {
+        // Transaction type
+        boolean SubordinateCoordinators = false ;
+
+        // uids per transaction type
+        InputObjectState acc_uids = new InputObjectState() ;
+
+        try
+        {
+            if (XTSLogger.arjLogger.isDebugEnabled())
+            {
+                XTSLogger.arjLogger.debug( "StatusModule: first pass " );
+            }
+
+            SubordinateCoordinators = _transactionStore.allObjUids( _transactionType, acc_uids );
+
+        }
+        catch ( ObjectStoreException ex )
+        {
+            if (XTSLogger.arjLoggerI18N.isWarnEnabled())
+            {
+                XTSLogger.arjLoggerI18N.warn("org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_1",
+                        new Object[]{ex});
+            }
+        }
+
+        if ( SubordinateCoordinators )
+        {
+            _transactionUidVector = processTransactions( acc_uids ) ;
+        }
+    }
+
+    public void periodicWorkSecondPass()
+    {
+        if (XTSLogger.arjLogger.isDebugEnabled())
+        {
+            XTSLogger.arjLogger.debug( "SubordinateCoordinatorRecoveryModule: Second pass " );
+        }
+
+        if (_transactionUidVector != null) {
+            processTransactionsStatus() ;
+        }
+
+        // ok notify the coordinator processor that recovery processing has completed
+
+    }
+
+    protected SubordinateCoordinatorRecoveryModule(String type)
+    {
+        if (XTSLogger.arjLogger.isDebugEnabled())
+        {
+            XTSLogger.arjLogger.debug
+                    ( DebugLevel.CONSTRUCTORS,
+                            VisibilityLevel.VIS_PUBLIC,
+                            FacilityCode.FAC_CRASH_RECOVERY,
+                            "SubordinateCoordinatorRecoveryModule created " + type );
+        }
+
+        if (_transactionStore == null)
+        {
+            _transactionStore = TxControl.getStore() ;
+        }
+
+        _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;
+        _transactionType = type;
+
+    }
+
+    private void doRecoverTransaction( Uid recoverUid )
+    {
+        boolean commitThisTransaction = true ;
+
+        // Retrieve the transaction status from its original process.
+        // n.b. for a non-active XTS TX this status wil l always be committed even
+        // if it aborted or had a heuristic outcome. in that case we need to use
+        // the logged action status which can only be retrieved after activation
+
+        int theStatus = _transactionStatusConnectionMgr.getTransactionStatus( _transactionType, recoverUid ) ;
+
+        boolean inFlight = isTransactionInMidFlight( theStatus ) ;
+
+        String Status = ActionStatus.stringForm( theStatus ) ;
+
+        if (XTSLogger.arjLogger.isDebugEnabled())
+        {
+            XTSLogger.arjLogger.debug
+                    ( DebugLevel.FUNCTIONS,
+                            VisibilityLevel.VIS_PUBLIC,
+                            FacilityCode.FAC_CRASH_RECOVERY,
+                            "transaction type is "+ _transactionType + " uid is " +
+                                    recoverUid.toString() + "\n ActionStatus is " + Status +
+                                    " in flight is " + inFlight ) ;
+        }
+
+        if ( ! inFlight )
+        {
+            try
+            {
+                XTSLogger.arjLogger.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                            FacilityCode.FAC_CRASH_RECOVERY, "jjh doing revovery here for "+recoverUid);
+                // TODO jjh
+                RecoverSubordinateCoordinator rcvSubordinateCoordinator =
+                        new RecoverSubordinateCoordinator(recoverUid);
+                rcvSubordinateCoordinator.replayPhase2();
+            }
+            catch ( Exception ex )
+            {
+                if (XTSLogger.arjLoggerI18N.isWarnEnabled())
+                {
+                    XTSLogger.arjLoggerI18N.warn("org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_2",
+                            new Object[]{recoverUid.toString(), ex});
+                }
+            }
+        }
+    }
+
+    private boolean isTransactionInMidFlight( int status )
+    {
+        boolean inFlight = false ;
+
+        switch ( status )
+        {
+            // these states can only come from a process that is still alive
+            case ActionStatus.RUNNING    :
+            case ActionStatus.ABORT_ONLY :
+            case ActionStatus.PREPARING  :
+            case ActionStatus.COMMITTING :
+            case ActionStatus.ABORTING   :
+            case ActionStatus.PREPARED   :
+                inFlight = true ;
+                break ;
+
+                // the transaction is apparently still there, but has completed its
+                // phase2. should be safe to redo it.
+            case ActionStatus.COMMITTED  :
+            case ActionStatus.H_COMMIT   :
+            case ActionStatus.H_MIXED    :
+            case ActionStatus.H_HAZARD   :
+            case ActionStatus.ABORTED    :
+            case ActionStatus.H_ROLLBACK :
+                inFlight = false ;
+                break ;
+
+                // this shouldn't happen
+            case ActionStatus.INVALID :
+            default:
+                inFlight = false ;
+        }
+
+        return inFlight ;
+    }
+
+    private Vector processTransactions( InputObjectState uids )
+    {
+        Vector uidVector = new Vector() ;
+
+        if (XTSLogger.arjLogger.isDebugEnabled())
+        {
+            XTSLogger.arjLogger.debug( DebugLevel.FUNCTIONS,
+                    VisibilityLevel.VIS_PUBLIC,
+                    FacilityCode.FAC_CRASH_RECOVERY,
+                    "processing " + _transactionType
+                            + " transactions" ) ;
+        }
+
+        Uid theUid = new Uid( Uid.nullUid() );
+
+        boolean moreUids = true ;
+
+        while (moreUids)
+        {
+            try
+            {
+                theUid.unpack( uids ) ;
+
+                if (theUid.equals( Uid.nullUid() ))
+                {
+                    moreUids = false;
+                }
+                else
+                {
+                    Uid newUid = new Uid( theUid ) ;
+
+                    if (XTSLogger.arjLogger.isDebugEnabled())
+                    {
+                        XTSLogger.arjLogger.debug
+                                ( DebugLevel.FUNCTIONS,
+                                        VisibilityLevel.VIS_PUBLIC,
+                                        FacilityCode.FAC_CRASH_RECOVERY,
+                                        "found transaction "+ newUid ) ;
+                    }
+
+                    uidVector.addElement( newUid ) ;
+                }
+            }
+            catch ( Exception ex )
+            {
+                moreUids = false;
+            }
+        }
+        return uidVector ;
+    }
+
+    private void processTransactionsStatus()
+    {
+        // Process the Vector of transaction Uids
+        Enumeration transactionUidEnum = _transactionUidVector.elements() ;
+
+        while ( transactionUidEnum.hasMoreElements() )
+        {
+            Uid currentUid = (Uid) transactionUidEnum.nextElement();
+
+            try
+            {
+                if ( _transactionStore.currentState( currentUid, _transactionType ) != ObjectStore.OS_UNKNOWN )
+                {
+                    doRecoverTransaction( currentUid ) ;
+                }
+            }
+            catch ( ObjectStoreException ex )
+            {
+                if (XTSLogger.arjLogger.isWarnEnabled())
+                {
+                    XTSLogger.arjLoggerI18N.warn("org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_3",
+                            new Object[]{currentUid.toString(), ex});
+                }
+            }
+        }
+
+        XTSATRecoveryManager.getRecoveryManager().setSubordinateCoordinatorRecoveryStarted();
+    }
+
+    // 'type' within the Object Store for ACCoordinator.
+    private String _transactionType = new SubordinateCoordinator().type() ;
+
+    // Array of transactions found in the object store of the
+    // ACCoordinator type.
+    private Vector _transactionUidVector = null ;
+
+    // Reference to the Object Store.
+    private static ObjectStore _transactionStore = null ;
+
+    // This object manages the interface to all TransactionStatusManagers
+    // processes(JVMs) on this system/node.
+    private TransactionStatusConnectionManager _transactionStatusConnectionMgr ;
+
+}
\ No newline at end of file


Property changes on: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java	2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java	2008-12-03 15:23:34 UTC (rev 24227)
@@ -299,6 +299,16 @@
     }
 
     /**
+     * test whether the first AT subordinate coordinator recovery scan has completed. this indicates
+     * whether there may or may not still be unknown AT subtransaction records on disk. If the first
+     * scan has not yet completed then a commit for an unknown subtransaction must raise an exception
+     * delaying commit of the parent transaction.
+     */
+    public synchronized boolean isSubordinateCoordinatorRecoveryStarted() {
+        return subordinateCoordinatorRecoveryStarted;
+    }
+
+    /**
      * record the fact that the first AT coordinator recovery scan has completed.
      */
 
@@ -307,6 +317,14 @@
     }
 
     /**
+     * record the fact that the first AT subordinate coordinator recovery scan has completed.
+     */
+
+    public synchronized void setSubordinateCoordinatorRecoveryStarted() {
+        subordinateCoordinatorRecoveryStarted = true;
+    }
+
+    /**
      * a global flag indicating whether the first AT participant recovery scan has
      * been performed.
      */
@@ -319,6 +337,12 @@
     private boolean coordinatorRecoveryStarted = false;
 
     /**
+     * a global flag indicating whether the first AT subordinate coordinator recovery scan has
+     * been performed.
+     */
+    private boolean subordinateCoordinatorRecoveryStarted = false;
+
+    /**
      * a map from participant ids to participant recovery records
      */
     private HashMap<String, ATParticipantRecoveryRecord> recoveryMap = new HashMap<String, ATParticipantRecoveryRecord>();




More information about the jboss-svn-commits mailing list