[jboss-svn-commits] JBL Code SVN: r21519 - labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Aug 13 12:06:34 EDT 2008


Author: adinn
Date: 2008-08-13 12:06:34 -0400 (Wed, 13 Aug 2008)
New Revision: 21519

Modified:
   labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java
Log:
added missing function to 1.0 participant recovery ensuring participant record is cleaned up properly if multiple threads are involved in rollback

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java	2008-08-13 16:02:30 UTC (rev 21518)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java	2008-08-13 16:06:34 UTC (rev 21519)
@@ -35,6 +35,7 @@
 import com.arjuna.webservices.wsat.NotificationType;
 import com.arjuna.webservices.wsat.ParticipantInboundEvents;
 import com.arjuna.webservices.wsat.State;
+import com.arjuna.webservices.wsat.AtomicTransactionConstants;
 import com.arjuna.webservices.wsat.client.CoordinatorClient;
 import com.arjuna.webservices.wsat.processors.ParticipantProcessor;
 import com.arjuna.webservices.wscoor.CoordinationConstants;
@@ -76,6 +77,11 @@
     private boolean recovered ;
 
     /**
+     * true if this participant's recovery details have been logged to disk otherwise false
+     */
+    private boolean persisted;
+
+    /**
      * Construct the initial engine for the participant.
      * @param participant The participant.
      * @param id The participant id.
@@ -100,6 +106,7 @@
         this.state = state ;
         this.coordinator = coordinator ;
         this.recovered = recovered;
+        this.persisted = recovered;
     }
     
     /**
@@ -197,6 +204,8 @@
      * PreparedSuccess -> Aborting (execute rollback, send aborted and forget)
      * Committing -> Committing (ignore)
      * Aborting -> Aborting (send aborted and forget)
+     *
+     *  @message com.arjuna.wst.messaging.engines.ParticipantEngine.rollback_1 [com.arjuna.wst.messaging.engines.ParticipantEngine.rollback_1] could not delete recovery record for participant {0}
      */
     public void rollback(final NotificationType rollback, final AddressingContext addressingContext, final ArjunaContext arjunaContext)
     {
@@ -222,6 +231,25 @@
                 }
             }
             
+            // if the participant managed to persist the log record then we should try
+            // to delete it. note that persisted can only be set to true by the PREPARING
+            // thread. if it detects a transtiion to ABORTING while it is doing the log write
+            // it will clear up itself.
+
+            if (persisted && participant instanceof Durable2PCParticipant) {
+                // if we cannot delete the participant we effectively drop the rollback message
+                // here in the hope that we have better luck next time..
+                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+                    // hmm, could not delete entry -- leave it so we can maybe retry later
+                    if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+                    {
+                        WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.engines.ParticipantEngine.rollback_1", new Object[] {id}) ;
+                    }
+
+                    return;
+                }
+            }
+
             sendAborted() ;
             
             if (current != null)
@@ -290,6 +318,8 @@
      * @param arjunaContext The arjuna context.
      * 
      * @message com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_1 [com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_1] - Unexpected SOAP fault for participant {0}: {1} {2}
+     * @message com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_2 [com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_2] - Unrecoverable error for participant {0} : {1} {2}
+     * @message com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_3 [com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_3] - Unable to delete recovery record at commit for participant {0}
      */
     public void soapFault(final SoapFault soapFault, final AddressingContext addressingContext, final ArjunaContext arjunaContext)
     {
@@ -303,6 +333,37 @@
         
         if (CoordinationConstants.WSCOOR_ERROR_CODE_INVALID_STATE_QNAME.equals(soapFault.getSubcode()))
         {
+            if (WSTLogger.arjLoggerI18N.isErrorEnabled())
+            {
+                final SoapFaultType soapFaultType = soapFault.getSoapFaultType() ;
+                final QName subCode = soapFault.getSubcode() ;
+                WSTLogger.arjLoggerI18N.error("com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_2", new Object[] {id, soapFaultType, subCode}) ;
+            }
+
+            // unrecoverable error -- forget this participant and delete any persistent
+            //  record of it
+            final State current ;
+
+            synchronized(this)
+            {
+                current = state;
+                state = null;
+            }
+
+            if (persisted && participant instanceof Durable2PCParticipant) {
+                // remove any durable participant recovery record from the persistent store
+                Durable2PCParticipant durableParticipant =(Durable2PCParticipant) participant;
+
+                // if we cannot delete the participant we record an error here
+                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+                    // hmm, could not delete entry -- log an error
+                    if (WSTLogger.arjLoggerI18N.isErrorEnabled())
+                    {
+                        WSTLogger.arjLoggerI18N.error("com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_3", new Object[] {id}) ;
+                    }
+                }
+            }
+
             forget() ;
         }
     }
@@ -312,10 +373,16 @@
      * 
      * Preparing -> PreparedSuccess (send Prepared)
      * Committing -> Committing (send committed and forget)
+     *
+     * @message com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_2 [com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_2] - Unable to delete recovery record during prepare for participant {0}
+     * @message com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_3 [com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_3] - Unable to delete recovery record at commit for participant {0}
      */
     private void commitDecision()
     {
-        final State current ;
+        State current ;
+        boolean rollbackRequired  = false;
+        boolean deleteRequired  = false;
+
         synchronized(this)
         {
             current = state ;
@@ -327,37 +394,97 @@
         
         if (current == State.STATE_PREPARING)
         {
+            // ok, we need to write the recovery details to log and send prepared.
+            // if we cannot write the log then we have to rollback the participant
+            //  and send aborted.
             if (participant instanceof Durable2PCParticipant) {
                 // write a durable participant recovery record to the persistent store
                 Durable2PCParticipant durableParticipant =(Durable2PCParticipant) participant;
 
                 ATParticipantRecoveryRecord recoveryRecord = new ATParticipantRecoveryRecord(id, durableParticipant, coordinator);
+
                 if (!XTSATRecoveryManager.getRecoveryManager().writeParticipantRecoveryRecord(recoveryRecord)) {
-                    // we need to revert the state to PREPARING
-                    state = State.STATE_PREPARING;
-                    return;
+                    // we need to rollback and send aborted unless some other thread
+                    //gets there first
+                    rollbackRequired = true;
                 }
             }
 
-            sendPrepared() ;
+            // recheck state in case a rollback or readonly came in while we were writing the
+            // log record
+            synchronized (this) {
+                current = state;
+
+                if (current == State.STATE_PREPARED_SUCCESS) {
+                    if (rollbackRequired) {
+                        // if we change state to aborting then we are responsible for
+                        // calling rollback and sending aborted but we have no log record
+                        // to delete
+                        state = State.STATE_ABORTING;
+                    } else {
+                        // this ensures any subsequent commit or rollback deletes the log record
+                        // so we still have no log record to delete here
+                        persisted = true;
+                    }
+                } else if (!rollbackRequired) {
+                    // an incoming rollback or readonly changed the state to aborted or null so
+                    // it will already have performed a rollback if required but we need to
+                    // delete the log record since the rollback/readonly thread did not know
+                    // about it
+                    deleteRequired = true;
+                }
+            }
+
+            if (rollbackRequired)
+            {
+                // we need to do the rollback and send aborted
+
+                executeRollback();
+
+                sendAborted();
+                forget();
+            } else if (deleteRequired) {
+                // just try to delete the log entry -- any required aborted has already been sent
+
+                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+                    // hmm, could not delete entry log warning
+                    if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+                    {
+                        WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_2", new Object[] {id}) ;
+                    }
+                }
+            } else {
+                // whew got through -- send a prepared
+                sendPrepared() ;
+            }
         }
         else if (current == State.STATE_COMMITTING)
         {
-            if (participant instanceof Durable2PCParticipant) {
+            if (persisted && participant instanceof Durable2PCParticipant) {
                 // remove any durable participant recovery record from the persistent store
                 Durable2PCParticipant durableParticipant =(Durable2PCParticipant) participant;
 
                 // if we cannot delete the participant we effectively drop the commit message
-                // here in the hope that we have better luck next time. the delete call will
-                // already have logged a warning so there is no else branch here.
-                if (XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
-                    sendCommitted();
-                    forget();
+                // here in the hope that we have better luck next time.
+                
+                if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+                    // hmm, could not delete entry -- log a warning
+                    if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+                    {
+                        WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_3", new Object[] {id}) ;
+                    }
+                    // now revert back to PREPARED_SUCCESS and drop message awaiting a retry
+
+                    synchronized (this) {
+                        state = State.STATE_PREPARED_SUCCESS;
+                    }
+
+                    return;
                 }
-            } else {
-                sendCommitted() ;
-                forget() ;
             }
+
+            sendCommitted() ;
+            forget() ;
         }
     }
     




More information about the jboss-svn-commits mailing list