[jboss-svn-commits] JBL Code SVN: r21519 - labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Aug 13 12:06:34 EDT 2008
Author: adinn
Date: 2008-08-13 12:06:34 -0400 (Wed, 13 Aug 2008)
New Revision: 21519
Modified:
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java
Log:
added missing function to 1.0 participant recovery ensuring participant record is cleaned up properly if multiple threads are involved in rollback
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java 2008-08-13 16:02:30 UTC (rev 21518)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java 2008-08-13 16:06:34 UTC (rev 21519)
@@ -35,6 +35,7 @@
import com.arjuna.webservices.wsat.NotificationType;
import com.arjuna.webservices.wsat.ParticipantInboundEvents;
import com.arjuna.webservices.wsat.State;
+import com.arjuna.webservices.wsat.AtomicTransactionConstants;
import com.arjuna.webservices.wsat.client.CoordinatorClient;
import com.arjuna.webservices.wsat.processors.ParticipantProcessor;
import com.arjuna.webservices.wscoor.CoordinationConstants;
@@ -76,6 +77,11 @@
private boolean recovered ;
/**
+ * true if this participant's recovery details have been logged to disk otherwise false
+ */
+ private boolean persisted;
+
+ /**
* Construct the initial engine for the participant.
* @param participant The participant.
* @param id The participant id.
@@ -100,6 +106,7 @@
this.state = state ;
this.coordinator = coordinator ;
this.recovered = recovered;
+ this.persisted = recovered;
}
/**
@@ -197,6 +204,8 @@
* PreparedSuccess -> Aborting (execute rollback, send aborted and forget)
* Committing -> Committing (ignore)
* Aborting -> Aborting (send aborted and forget)
+ *
+ * @message com.arjuna.wst.messaging.engines.ParticipantEngine.rollback_1 [com.arjuna.wst.messaging.engines.ParticipantEngine.rollback_1] could not delete recovery record for participant {0}
*/
public void rollback(final NotificationType rollback, final AddressingContext addressingContext, final ArjunaContext arjunaContext)
{
@@ -222,6 +231,25 @@
}
}
+ // if the participant managed to persist the log record then we should try
+ // to delete it. note that persisted can only be set to true by the PREPARING
+ // thread. if it detects a transtiion to ABORTING while it is doing the log write
+ // it will clear up itself.
+
+ if (persisted && participant instanceof Durable2PCParticipant) {
+ // if we cannot delete the participant we effectively drop the rollback message
+ // here in the hope that we have better luck next time..
+ if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+ // hmm, could not delete entry -- leave it so we can maybe retry later
+ if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.engines.ParticipantEngine.rollback_1", new Object[] {id}) ;
+ }
+
+ return;
+ }
+ }
+
sendAborted() ;
if (current != null)
@@ -290,6 +318,8 @@
* @param arjunaContext The arjuna context.
*
* @message com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_1 [com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_1] - Unexpected SOAP fault for participant {0}: {1} {2}
+ * @message com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_2 [com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_2] - Unrecoverable error for participant {0} : {1} {2}
+ * @message com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_3 [com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_3] - Unable to delete recovery record at commit for participant {0}
*/
public void soapFault(final SoapFault soapFault, final AddressingContext addressingContext, final ArjunaContext arjunaContext)
{
@@ -303,6 +333,37 @@
if (CoordinationConstants.WSCOOR_ERROR_CODE_INVALID_STATE_QNAME.equals(soapFault.getSubcode()))
{
+ if (WSTLogger.arjLoggerI18N.isErrorEnabled())
+ {
+ final SoapFaultType soapFaultType = soapFault.getSoapFaultType() ;
+ final QName subCode = soapFault.getSubcode() ;
+ WSTLogger.arjLoggerI18N.error("com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_2", new Object[] {id, soapFaultType, subCode}) ;
+ }
+
+ // unrecoverable error -- forget this participant and delete any persistent
+ // record of it
+ final State current ;
+
+ synchronized(this)
+ {
+ current = state;
+ state = null;
+ }
+
+ if (persisted && participant instanceof Durable2PCParticipant) {
+ // remove any durable participant recovery record from the persistent store
+ Durable2PCParticipant durableParticipant =(Durable2PCParticipant) participant;
+
+ // if we cannot delete the participant we record an error here
+ if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+ // hmm, could not delete entry -- log an error
+ if (WSTLogger.arjLoggerI18N.isErrorEnabled())
+ {
+ WSTLogger.arjLoggerI18N.error("com.arjuna.wst.messaging.engines.ParticipantEngine.soapFault_3", new Object[] {id}) ;
+ }
+ }
+ }
+
forget() ;
}
}
@@ -312,10 +373,16 @@
*
* Preparing -> PreparedSuccess (send Prepared)
* Committing -> Committing (send committed and forget)
+ *
+ * @message com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_2 [com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_2] - Unable to delete recovery record during prepare for participant {0}
+ * @message com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_3 [com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_3] - Unable to delete recovery record at commit for participant {0}
*/
private void commitDecision()
{
- final State current ;
+ State current ;
+ boolean rollbackRequired = false;
+ boolean deleteRequired = false;
+
synchronized(this)
{
current = state ;
@@ -327,37 +394,97 @@
if (current == State.STATE_PREPARING)
{
+ // ok, we need to write the recovery details to log and send prepared.
+ // if we cannot write the log then we have to rollback the participant
+ // and send aborted.
if (participant instanceof Durable2PCParticipant) {
// write a durable participant recovery record to the persistent store
Durable2PCParticipant durableParticipant =(Durable2PCParticipant) participant;
ATParticipantRecoveryRecord recoveryRecord = new ATParticipantRecoveryRecord(id, durableParticipant, coordinator);
+
if (!XTSATRecoveryManager.getRecoveryManager().writeParticipantRecoveryRecord(recoveryRecord)) {
- // we need to revert the state to PREPARING
- state = State.STATE_PREPARING;
- return;
+ // we need to rollback and send aborted unless some other thread
+ //gets there first
+ rollbackRequired = true;
}
}
- sendPrepared() ;
+ // recheck state in case a rollback or readonly came in while we were writing the
+ // log record
+ synchronized (this) {
+ current = state;
+
+ if (current == State.STATE_PREPARED_SUCCESS) {
+ if (rollbackRequired) {
+ // if we change state to aborting then we are responsible for
+ // calling rollback and sending aborted but we have no log record
+ // to delete
+ state = State.STATE_ABORTING;
+ } else {
+ // this ensures any subsequent commit or rollback deletes the log record
+ // so we still have no log record to delete here
+ persisted = true;
+ }
+ } else if (!rollbackRequired) {
+ // an incoming rollback or readonly changed the state to aborted or null so
+ // it will already have performed a rollback if required but we need to
+ // delete the log record since the rollback/readonly thread did not know
+ // about it
+ deleteRequired = true;
+ }
+ }
+
+ if (rollbackRequired)
+ {
+ // we need to do the rollback and send aborted
+
+ executeRollback();
+
+ sendAborted();
+ forget();
+ } else if (deleteRequired) {
+ // just try to delete the log entry -- any required aborted has already been sent
+
+ if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+ // hmm, could not delete entry log warning
+ if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_2", new Object[] {id}) ;
+ }
+ }
+ } else {
+ // whew got through -- send a prepared
+ sendPrepared() ;
+ }
}
else if (current == State.STATE_COMMITTING)
{
- if (participant instanceof Durable2PCParticipant) {
+ if (persisted && participant instanceof Durable2PCParticipant) {
// remove any durable participant recovery record from the persistent store
Durable2PCParticipant durableParticipant =(Durable2PCParticipant) participant;
// if we cannot delete the participant we effectively drop the commit message
- // here in the hope that we have better luck next time. the delete call will
- // already have logged a warning so there is no else branch here.
- if (XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
- sendCommitted();
- forget();
+ // here in the hope that we have better luck next time.
+
+ if (!XTSATRecoveryManager.getRecoveryManager().deleteParticipantRecoveryRecord(id)) {
+ // hmm, could not delete entry -- log a warning
+ if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.engines.ParticipantEngine.commitDecision_3", new Object[] {id}) ;
+ }
+ // now revert back to PREPARED_SUCCESS and drop message awaiting a retry
+
+ synchronized (this) {
+ state = State.STATE_PREPARED_SUCCESS;
+ }
+
+ return;
}
- } else {
- sendCommitted() ;
- forget() ;
}
+
+ sendCommitted() ;
+ forget() ;
}
}
More information about the jboss-svn-commits
mailing list