[jboss-svn-commits] JBL Code SVN: r25228 - in labs/jbosstm/trunk/XTS: WS-T/dev/src10/com/arjuna/wst/messaging/engines and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Feb 11 10:36:34 EST 2009
Author: adinn
Date: 2009-02-11 10:36:33 -0500 (Wed, 11 Feb 2009)
New Revision: 25228
Added:
labs/jbosstm/trunk/XTS/sar/tests/dd/scripts/ATCrashDuringCommit.txt
Modified:
labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/util/TransportTimer.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorCompletionParticipantEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantCompletionParticipantEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorCompletionParticipantEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantCompletionParticipantEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantEngine.java
Log:
fixed JBTM-487
Modified: labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/util/TransportTimer.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/util/TransportTimer.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/util/TransportTimer.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -38,11 +38,16 @@
*/
private static long TIMEOUT = 30000 ;
/**
- * The transport period.
+ * The initial transport period.
*/
private static long PERIOD = 5000 ;
/**
+ * The maximum transport period.
+ */
+ private static long MAX_PERIOD = 300000 ;
+
+ /**
* Get the transport timer.
* @return The transport timer.
*/
@@ -70,12 +75,12 @@
}
/**
- * Set the transport period.
+ * Set the initial transport period.
* @param period The transport period in milliseconds.
*/
- public static void setTransportPeriod(final long period)
+ public static void setInitialTransportPeriod(final long period)
{
- PERIOD = period ;
+ PERIOD = period ;
}
/**
@@ -86,4 +91,28 @@
{
return PERIOD ;
}
+
+ /**
+ * Set the maximum transport period for engines which require an exponentially increasing period between
+ * message resends.
+ * @param period The transport period in milliseconds.
+ */
+ public static void setMaximumTransportPeriod(final long period)
+ {
+ MAX_PERIOD = period ;
+ }
+
+ /**
+ * Get the maximum transport period for engines which require an exponentially increasing period between
+ * message resends.
+ * @return The transport period in milliseconds.
+ */
+ public static long getMaximumTransportPeriod()
+ {
+ if (MAX_PERIOD < PERIOD) {
+ return PERIOD;
+ }
+
+ return MAX_PERIOD ;
+ }
}
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorCompletionParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorCompletionParticipantEngine.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorCompletionParticipantEngine.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -72,7 +72,32 @@
* The associated timer task or null.
*/
private TimerTask timerTask ;
+
/**
+ * the time which will elapse before the next message resend. this is incrementally increased
+ * until it reaches RESEND_PERIOD_MAX
+ */
+ private long resendPeriod;
+
+ /**
+ * the initial period we will allow between resends.
+ */
+ private long initialResendPeriod;
+
+ /**
+ * the maximum period we will allow between resends. n.b. the coordinator uses the value returned
+ * by getTransportTimeout as the limit for how long it waits for a response. however, we can still
+ * employ a max resend period in excess of this value. if a message comes in after the coordinator
+ * has given up it will catch it on the next retry.
+ */
+ private long maxResendPeriod;
+
+ /**
+ * the amount of time we will wait for a response to a dispatched message
+ */
+ private long timeout;
+
+ /**
* true id this is a recovered participant otherwise false.
*/
private boolean recovered ;
@@ -111,6 +136,10 @@
this.state = state ;
this.recovered = recovered;
this.persisted = recovered;
+ this.initialResendPeriod = TransportTimer.getTransportPeriod();
+ this.maxResendPeriod = TransportTimer.getMaximumTransportPeriod();
+ this.timeout = TransportTimer.getTransportTimeout();
+ this.resendPeriod = this.initialResendPeriod;
}
/**
@@ -613,7 +642,7 @@
sendExit() ;
}
- return waitForState(State.STATE_EXITING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_EXITING, timeout) ;
}
/**
@@ -651,14 +680,14 @@
(current == State.STATE_FAULTING_ACTIVE))
{
sendFault("Fault called when state active/faulting active") ;
- return waitForState(State.STATE_FAULTING_ACTIVE, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAULTING_ACTIVE, timeout) ;
}
else if ((current == State.STATE_COMPENSATING) || (current == State.STATE_FAULTING_COMPENSATING))
{
sendFault("Fault called when state compensating/faulting compensating") ;
}
- return waitForState(State.STATE_FAULTING_COMPENSATING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAULTING_COMPENSATING, timeout) ;
}
/**
@@ -682,7 +711,7 @@
if (current == State.STATE_COMPLETED)
{
- sendCompleted() ;
+ sendCompleted(true) ;
}
}
@@ -708,11 +737,20 @@
}
/**
+ * Send the completed message
+ */
+
+ private void sendCompleted()
+ {
+ sendCompleted(false);
+ }
+
+ /**
* Send the completed message.
*
* @message com.arjuna.wst.messaging.engines.CoordinatorCompletionParticipantEngine.sendCompleted_1 [com.arjuna.wst.messaging.engines.CoordinatorCompletionParticipantEngine.sendCompleted_1] - Unexpected exception while sending Completed
*/
- private void sendCompleted()
+ private void sendCompleted(boolean timedOut)
{
final AddressingContext addressingContext = createContext() ;
try
@@ -727,9 +765,35 @@
}
}
+ // if we timed out the increase the resend period otherwise make sure it is reset to the
+ // initial resend period
+
+ updateResendPeriod(timedOut);
+
initiateTimer() ;
}
+ private synchronized void updateResendPeriod(boolean timedOut)
+ {
+ // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
+ // if not we make sure it is reset to the initial period
+
+ if (timedOut) {
+ if (resendPeriod < maxResendPeriod) {
+ long newPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (newPeriod > maxResendPeriod) {
+ newPeriod = maxResendPeriod;
+ }
+ resendPeriod = newPeriod;
+ }
+ } else {
+ if (resendPeriod > initialResendPeriod) {
+ resendPeriod = initialResendPeriod;
+ }
+ }
+ }
+
/**
* Send the fault message.
* @param message The fault message.
@@ -1197,7 +1261,7 @@
commsTimeout(this) ;
}
} ;
- TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ TransportTimer.getTimer().schedule(timerTask, resendPeriod) ;
}
else
{
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantCompletionParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantCompletionParticipantEngine.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantCompletionParticipantEngine.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -71,7 +71,32 @@
* The associated timer task or null.
*/
private TimerTask timerTask ;
+
/**
+ * the time which will elapse before the next message resend. this is incrementally increased
+ * until it reaches RESEND_PERIOD_MAX
+ */
+ private long resendPeriod;
+
+ /**
+ * the initial period we will allow between resends.
+ */
+ private long initialResendPeriod;
+
+ /**
+ * the maximum period we will allow between resends. n.b. the coordinator uses the value returned
+ * by getTransportTimeout as the limit for how long it waits for a response. however, we can still
+ * employ a max resend period in excess of this value. if a message comes in after the coordinator
+ * has given up it will catch it on the next retry.
+ */
+ private long maxResendPeriod;
+
+ /**
+ * the amount of time we will wait for a response to a dispatched message
+ */
+ private long timeout;
+
+ /**
* true id this is a recovered participant otherwise false.
*/
private boolean recovered ;
@@ -110,6 +135,10 @@
this.state = state ;
this.recovered = recovered;
this.persisted = recovered;
+ this.initialResendPeriod = TransportTimer.getTransportPeriod();
+ this.maxResendPeriod = TransportTimer.getMaximumTransportPeriod();
+ this.timeout = TransportTimer.getTransportTimeout();
+ this.resendPeriod = this.initialResendPeriod;
}
/**
@@ -551,7 +580,7 @@
sendExit() ;
}
- return waitForState(State.STATE_EXITING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_EXITING, timeout) ;
}
/**
@@ -587,14 +616,14 @@
if ((current == State.STATE_ACTIVE) || (current == State.STATE_FAULTING_ACTIVE))
{
sendFault("Fault called when state active/faulting active") ;
- return waitForState(State.STATE_FAULTING_ACTIVE, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAULTING_ACTIVE, timeout) ;
}
else if ((current == State.STATE_COMPENSATING) || (current == State.STATE_FAULTING_COMPENSATING))
{
sendFault("Fault called when state compensating/faulting compensating") ;
}
- return waitForState(State.STATE_FAULTING_COMPENSATING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAULTING_COMPENSATING, timeout) ;
}
/**
@@ -618,7 +647,7 @@
if (current == State.STATE_COMPLETED)
{
- sendCompleted() ;
+ sendCompleted(true) ;
}
}
@@ -644,11 +673,21 @@
}
/**
+ * Send the completed message
+ */
+
+ private void sendCompleted()
+ {
+ sendCompleted(false);
+ }
+
+ /**
* Send the completed message.
*
+ * @param timedOut true if this is in response to a comms timeout
* @message com.arjuna.wst.messaging.engines.ParticipantCompletionParticipantEngine.sendCompleted_1 [com.arjuna.wst.messaging.engines.ParticipantCompletionParticipantEngine.sendCompleted_1] - Unexpected exception while sending Completed
*/
- private void sendCompleted()
+ private void sendCompleted(boolean timedOut)
{
final AddressingContext addressingContext = createContext() ;
try
@@ -663,9 +702,35 @@
}
}
+ // if we timed out the increase the resend period otherwise make sure it is reset to the
+ // initial resend period
+
+ updateResendPeriod(timedOut);
+
initiateTimer() ;
}
+ private synchronized void updateResendPeriod(boolean timedOut)
+ {
+ // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
+ // if not we make sure it is reset to the initial period
+
+ if (timedOut) {
+ if (resendPeriod < maxResendPeriod) {
+ long newPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (newPeriod > maxResendPeriod) {
+ newPeriod = maxResendPeriod;
+ }
+ resendPeriod = newPeriod;
+ }
+ } else {
+ if (resendPeriod > initialResendPeriod) {
+ resendPeriod = initialResendPeriod;
+ }
+ }
+ }
+
/**
* Send the fault message.
* @param message The fault message.
@@ -1026,7 +1091,7 @@
commsTimeout(this) ;
}
} ;
- TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ TransportTimer.getTimer().schedule(timerTask, resendPeriod) ;
}
else
{
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/ParticipantEngine.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -35,7 +35,6 @@
import com.arjuna.webservices.wsat.NotificationType;
import com.arjuna.webservices.wsat.ParticipantInboundEvents;
import com.arjuna.webservices.wsat.State;
-import com.arjuna.webservices.wsat.AtomicTransactionConstants;
import com.arjuna.webservices.wsat.client.CoordinatorClient;
import com.arjuna.webservices.wsat.processors.ParticipantProcessor;
import com.arjuna.webservices.wscoor.CoordinationConstants;
@@ -72,6 +71,25 @@
private TimerTask timerTask ;
/**
+ * the time which will elapse before the next message resend. this is incrementally increased
+ * until it reaches RESEND_PERIOD_MAX
+ */
+ private long resendPeriod;
+
+ /**
+ * the initial period we will allow between resends.
+ */
+ private long initialResendPeriod;
+
+ /**
+ * the maximum period we will allow between resends. n.b. the coordinator uses the value returned
+ * by getTransportTimeout as the limit for how long it waits for a response. however, we can still
+ * employ a max resend period in excess of this value. if a message comes in after the coordinator
+ * has given up it will catch it on the next retry.
+ */
+ private long maxResendPeriod;
+
+ /**
* true id this is a recovered participant otherwise false.
*/
private boolean recovered ;
@@ -107,6 +125,9 @@
this.coordinator = coordinator ;
this.recovered = recovered;
this.persisted = recovered;
+ this.initialResendPeriod = TransportTimer.getTransportPeriod();
+ this.maxResendPeriod = TransportTimer.getMaximumTransportPeriod();
+ this.resendPeriod = initialResendPeriod;
}
/**
@@ -174,6 +195,9 @@
if (current == State.STATE_ACTIVE)
{
state = State.STATE_PREPARING ;
+ } else if (current == State.STATE_PREPARED_SUCCESS) {
+ // hmm, client may have missed a prepared message -- reset the period
+ resendPeriod = TransportTimer.getTransportPeriod();
}
}
@@ -559,7 +583,7 @@
if (current == State.STATE_PREPARED_SUCCESS)
{
- sendPrepared() ;
+ sendPrepared(true) ;
}
}
@@ -702,14 +726,23 @@
}
}
}
-
+
/**
* Send the prepared message.
- *
- * @message com.arjuna.wst.messaging.engines.ParticipantEngine.sendPrepared_1 [com.arjuna.wst.messaging.engines.ParticipantEngine.sendPrepared_1] - Unexpected exception while sending Prepared
*/
private void sendPrepared()
{
+ sendPrepared(false);
+ }
+
+ /**
+ * Send the prepared message.
+ *
+ * @param timedOut true if this is in response to a comms timeout
+ * @message com.arjuna.wst.messaging.engines.ParticipantEngine.sendPrepared_1 [com.arjuna.wst.messaging.engines.ParticipantEngine.sendPrepared_1] - Unexpected exception while sending Prepared
+ */
+ private void sendPrepared(boolean timedOut)
+ {
final AddressingContext responseAddressingContext = createContext() ;
final InstanceIdentifier instanceIdentifier = new InstanceIdentifier(id) ;
try
@@ -723,10 +756,33 @@
WSTLogger.arjLoggerI18N.debug("com.arjuna.wst.messaging.engines.ParticipantEngine.sendPrepared_1", th) ;
}
}
-
+
+ updateResendPeriod(timedOut);
+
initiateTimer() ;
}
+ private synchronized void updateResendPeriod(boolean timedOut)
+ {
+ // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
+ // if not we make sure it is reset to the initial period
+
+ if (timedOut) {
+ if (resendPeriod < maxResendPeriod) {
+ long newPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (newPeriod > maxResendPeriod) {
+ newPeriod = maxResendPeriod;
+ }
+ resendPeriod = newPeriod;
+ }
+ } else {
+ if (resendPeriod > initialResendPeriod) {
+ resendPeriod = initialResendPeriod;
+ }
+ }
+ }
+
/**
* Send the aborted message.
*
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorCompletionParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorCompletionParticipantEngine.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorCompletionParticipantEngine.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -74,7 +74,32 @@
* The associated timer task or null.
*/
private TimerTask timerTask ;
+
/**
+ * the time which will elapse before the next message resend. this is incrementally increased
+ * until it reaches RESEND_PERIOD_MAX
+ */
+ private long resendPeriod;
+
+ /**
+ * the initial period we will allow between resends.
+ */
+ private long initialResendPeriod;
+
+ /**
+ * the maximum period we will allow between resends. n.b. the coordinator uses the value returned
+ * by getTransportTimeout as the limit for how long it waits for a response. however, we can still
+ * employ a max resend period in excess of this value. if a message comes in after the coordinator
+ * has given up it will catch it on the next retry.
+ */
+ private long maxResendPeriod;
+
+ /**
+ * the amount of time we will wait for a response to a dispatched message
+ */
+ private long timeout;
+
+ /**
* true if this participant has been recovered otherwise false
*/
private boolean recovered;
@@ -115,6 +140,10 @@
this.state = state ;
this.recovered = recovered;
this.persisted = recovered;
+ this.initialResendPeriod = TransportTimer.getTransportPeriod();
+ this.maxResendPeriod = TransportTimer.getMaximumTransportPeriod();
+ this.timeout = TransportTimer.getTransportTimeout();
+ this.resendPeriod = this.initialResendPeriod;
}
/**
@@ -656,7 +685,7 @@
(current == State.STATE_EXITING))
{
sendExit() ;
- return waitForState(State.STATE_EXITING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_EXITING, timeout) ;
}
return current ;
}
@@ -705,22 +734,22 @@
if (current == State.STATE_ACTIVE)
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_ACTIVE, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_ACTIVE, timeout) ;
}
else if (current == State.STATE_CANCELING)
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_CANCELING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_CANCELING, timeout) ;
}
else if (current == State.STATE_COMPLETING)
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_COMPLETING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_COMPLETING, timeout) ;
}
else if (current == State.STATE_COMPENSATING)
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_COMPENSATING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_COMPENSATING, timeout) ;
}
return current ;
@@ -759,7 +788,7 @@
(current == State.STATE_NOT_COMPLETING))
{
sendCannotComplete() ;
- return waitForState(State.STATE_NOT_COMPLETING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_NOT_COMPLETING, timeout) ;
}
return current ;
}
@@ -785,7 +814,7 @@
if (current == State.STATE_COMPLETED)
{
- sendCompleted() ;
+ sendCompleted(true) ;
}
}
@@ -812,10 +841,19 @@
/**
* Send the completed message.
+ */
+ private void sendCompleted()
+ {
+ sendCompleted(false);
+ }
+
+ /**
+ * Send the completed message.
*
+ * @param timedOut true if this is in response to a comms timeout
* @message com.arjuna.wst11.messaging.engines.CoordinatorCompletionParticipantEngine.sendCompleted_1 [com.arjuna.wst11.messaging.engines.CoordinatorCompletionParticipantEngine.sendCompleted_1] - Unexpected exception while sending Completed
*/
- private void sendCompleted()
+ private void sendCompleted(boolean timedOut)
{
final AddressingProperties addressingProperties = createContext() ;
try
@@ -830,9 +868,35 @@
}
}
+ // if we timed out the increase the resend period otherwise make sure it is reset to the
+ // initial resend period
+
+ updateResendPeriod(timedOut);
+
initiateTimer() ;
}
+ private synchronized void updateResendPeriod(boolean timedOut)
+ {
+ // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
+ // if not we make sure it is reset to the initial period
+
+ if (timedOut) {
+ if (resendPeriod < maxResendPeriod) {
+ long newPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (newPeriod > maxResendPeriod) {
+ newPeriod = maxResendPeriod;
+ }
+ resendPeriod = newPeriod;
+ }
+ } else {
+ if (resendPeriod > initialResendPeriod) {
+ resendPeriod = initialResendPeriod;
+ }
+ }
+ }
+
/**
* Send the fail message.
* @param message The fail message.
@@ -1330,7 +1394,7 @@
commsTimeout(this) ;
}
} ;
- TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ TransportTimer.getTimer().schedule(timerTask, resendPeriod) ;
}
else
{
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantCompletionParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantCompletionParticipantEngine.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantCompletionParticipantEngine.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -74,7 +74,32 @@
* The associated timer task or null.
*/
private TimerTask timerTask ;
+
/**
+ * the time which will elapse before the next message resend. this is incrementally increased
+ * until it reaches RESEND_PERIOD_MAX
+ */
+ private long resendPeriod;
+
+ /**
+ * the initial period we will allow between resends.
+ */
+ private long initialResendPeriod;
+
+ /**
+ * the maximum period we will allow between resends. n.b. the coordinator uses the value returned
+ * by getTransportTimeout as the limit for how long it waits for a response. however, we can still
+ * employ a max resend period in excess of this value. if a message comes in after the coordinator
+ * has given up it will catch it on the next retry.
+ */
+ private long maxResendPeriod;
+
+ /**
+ * the amount of time we will wait for a response to a dispatched message
+ */
+ private long timeout;
+
+ /**
* true if this participant has been recovered otherwise false
*/
private boolean recovered;
@@ -114,6 +139,10 @@
this.state = state ;
this.recovered = recovered;
this.persisted = recovered;
+ this.initialResendPeriod = TransportTimer.getTransportPeriod();
+ this.maxResendPeriod = TransportTimer.getMaximumTransportPeriod();
+ this.timeout = TransportTimer.getTransportTimeout();
+ this.resendPeriod = initialResendPeriod;
}
/**
@@ -581,7 +610,7 @@
sendExit() ;
}
- return waitForState(State.STATE_EXITING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_EXITING, timeout) ;
}
/**
@@ -622,17 +651,17 @@
if ((current == State.STATE_ACTIVE) || (current == State.STATE_FAILING_ACTIVE))
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_ACTIVE, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_ACTIVE, timeout) ;
}
else if ((current == State.STATE_CANCELING) || (current == State.STATE_FAILING_CANCELING))
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_CANCELING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_CANCELING, timeout) ;
}
else if ((current == State.STATE_COMPENSATING) || (current == State.STATE_FAILING_COMPENSATING))
{
sendFail(exceptionIdentifier) ;
- return waitForState(State.STATE_FAILING_COMPENSATING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_FAILING_COMPENSATING, timeout) ;
}
return current ;
@@ -668,7 +697,7 @@
if ((current == State.STATE_ACTIVE) || (current == State.STATE_NOT_COMPLETING))
{
sendCannotComplete() ;
- return waitForState(State.STATE_NOT_COMPLETING, TransportTimer.getTransportTimeout()) ;
+ return waitForState(State.STATE_NOT_COMPLETING, timeout) ;
}
return current ;
}
@@ -694,7 +723,7 @@
if (current == State.STATE_COMPLETED)
{
- sendCompleted() ;
+ sendCompleted(true) ;
}
}
@@ -720,11 +749,21 @@
}
/**
+ * Send the completed message
+ */
+
+ private void sendCompleted()
+ {
+ sendCompleted(false);
+ }
+
+ /**
* Send the completed message.
*
+ * @param timedOut true if this is in response to a comms timeout
* @message com.arjuna.wst11.messaging.engines.ParticipantCompletionParticipantEngine.sendCompleted_1 [com.arjuna.wst11.messaging.engines.ParticipantCompletionParticipantEngine.sendCompleted_1] - Unexpected exception while sending Completed
*/
- private void sendCompleted()
+ private void sendCompleted(boolean timedOut)
{
final AddressingProperties addressingProperties = createContext() ;
try
@@ -739,9 +778,35 @@
}
}
+ // if we timed out the increase the resend period otherwise make sure it is reset to the
+ // initial resend period
+
+ updateResendPeriod(timedOut);
+
initiateTimer() ;
}
+ private synchronized void updateResendPeriod(boolean timedOut)
+ {
+ // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
+ // if not we make sure it is reset to the initial period
+
+ if (timedOut) {
+ if (resendPeriod < maxResendPeriod) {
+ long newPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (newPeriod > maxResendPeriod) {
+ newPeriod = maxResendPeriod;
+ }
+ resendPeriod = newPeriod;
+ }
+ } else {
+ if (resendPeriod > initialResendPeriod) {
+ resendPeriod = initialResendPeriod;
+ }
+ }
+ }
+
/**
* Send the fail message.
* @param message The fail message.
@@ -1134,7 +1199,7 @@
commsTimeout(this) ;
}
} ;
- TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ TransportTimer.getTimer().schedule(timerTask, resendPeriod) ;
}
else
{
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantEngine.java 2009-02-11 15:20:41 UTC (rev 25227)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/ParticipantEngine.java 2009-02-11 15:36:33 UTC (rev 25228)
@@ -50,7 +50,27 @@
* The associated timer task or null.
*/
private TimerTask timerTask ;
+
/**
+ * the time which will elapse before the next message resend. this is incrementally increased
+ * until it reaches RESEND_PERIOD_MAX
+ */
+ private long resendPeriod;
+
+ /**
+ * the initial period we will allow between resends.
+ */
+ private long initialResendPeriod;
+
+ /**
+ * the maximum period we will allow between resends. n.b. the coordinator uses the value returned
+ * by getTransportTimeout as the limit for how long it waits for a response. however, we can still
+ * employ a max resend period in excess of this value. if a message comes in after the coordinator
+ * has given up it will catch it on the next retry.
+ */
+ private long maxResendPeriod;
+
+ /**
* true if this participant has been recovered otherwise false
*/
private boolean recovered;
@@ -86,6 +106,9 @@
this.coordinator = coordinator ;
this.recovered = recovered;
this.persisted = recovered;
+ this.initialResendPeriod = TransportTimer.getTransportPeriod();
+ this.maxResendPeriod = TransportTimer.getMaximumTransportPeriod();
+ this.resendPeriod = this.initialResendPeriod;
}
/**
@@ -544,6 +567,15 @@
return;
}
+ // double the resend period up to our maximum limit
+
+ if (resendPeriod < maxResendPeriod) {
+ resendPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (resendPeriod > maxResendPeriod) {
+ resendPeriod = maxResendPeriod;
+ }
+ }
current = state ;
}
@@ -696,10 +728,20 @@
/**
* Send the prepared message.
*
- * @message com.arjuna.wst11.messaging.engines.ParticipantEngine.sendPrepared_1 [com.arjuna.wst11.messaging.engines.ParticipantEngine.sendPrepared_1] - Unexpected exception while sending Prepared
*/
private void sendPrepared()
{
+ sendPrepared(false);
+ }
+
+ /**
+ * Send the prepared message.
+ *
+ * @param timedOut true if this is in response to a comms timeout
+ * @message com.arjuna.wst11.messaging.engines.ParticipantEngine.sendPrepared_1 [com.arjuna.wst11.messaging.engines.ParticipantEngine.sendPrepared_1] - Unexpected exception while sending Prepared
+ */
+ private void sendPrepared(boolean timedOut)
+ {
final AddressingProperties responseAddressingContext = createContext() ;
final InstanceIdentifier instanceIdentifier = new InstanceIdentifier(id) ;
try
@@ -714,9 +756,32 @@
}
}
+ updateResendPeriod(timedOut);
+
initiateTimer() ;
}
+ private synchronized void updateResendPeriod(boolean timedOut)
+ {
+ // if we timed out then we multiply the resend period by ~= sqrt(2) up to the maximum
+ // if not we make sure it is reset to the initial period
+
+ if (timedOut) {
+ if (resendPeriod < maxResendPeriod) {
+ long newPeriod = resendPeriod * 14 / 10; // approximately doubles every two resends
+
+ if (newPeriod > maxResendPeriod) {
+ newPeriod = maxResendPeriod;
+ }
+ resendPeriod = newPeriod;
+ }
+ } else {
+ if (resendPeriod > initialResendPeriod) {
+ resendPeriod = initialResendPeriod;
+ }
+ }
+ }
+
/**
* Send the aborted message.
*
@@ -778,7 +843,7 @@
commsTimeout(this) ;
}
} ;
- TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ TransportTimer.getTimer().schedule(timerTask, resendPeriod) ;
}
else
{
Copied: labs/jbosstm/trunk/XTS/sar/tests/dd/scripts/ATCrashDuringCommit.txt (from rev 25158, labs/jbosstm/trunk/XTS/sar/tests/dd/scripts/BACrashDuringCommit.txt)
===================================================================
--- labs/jbosstm/trunk/XTS/sar/tests/dd/scripts/ATCrashDuringCommit.txt (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/tests/dd/scripts/ATCrashDuringCommit.txt 2009-02-11 15:36:33 UTC (rev 25228)
@@ -0,0 +1,362 @@
+##############################################################################
+# JBoss, Home of Professional Open Source
+# Copyright 2009, Red Hat Middleware LLC, and individual contributors
+# by the @authors tag. See the copyright.txt in the distribution for a
+# full listing of individual contributors.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation; either version 2.1 of
+# the License, or (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this software; if not, write to the Free
+# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+#
+# @authors Andrew Dinn
+#
+# AT Crash During Commit
+#
+# This script automates testing of a specific recovery scenario for the
+# JBossTS XTS implementation of the WS-AT 1.1 protocol using orchestration
+# rules. The basic scenario employs a coordinator and 3 web services
+# running in a single JVM but other variants are possible (see below). The
+# scenario is as follows (** indicates intercession by a TOAST rule):
+#
+# AS boots
+# Cient starts a WS-AT activity
+# Client invokes web service 1
+# Web service 1 registers participant P1
+# Client invokes web service 2
+# Web service 2 registers participant P2
+# Client invokes web service 3
+# Web service 3 registers participant P3
+#
+# Client initiates transaction commit
+#
+# Coordinator initiates prepare of participant P1
+# ** Rule system logs dispatch of prepare to P1
+# ** Rule system logs receipt of prepared from P1
+# Coordinator initiates prepare of participant P2
+# ** Rule system logs dispatch of prepare to P2
+# ** Rule system logs receipt of prepared from P2
+# Coordinator initiates prepare of participant P3
+# ** Rule system logs dispatch of prepare to P3
+# ** Rule system logs receipt of prepared from P3
+#
+# Coordinator initiates commit of participant P1
+# ** Rule system intercepts commit and crashes JVM
+#
+# AS reboots
+# Recovery system starts after 2 minutes
+# Recovery system recreates PREPARED WS-AT activity coordinator
+# ** Rule system traces create
+# Recovery system recreates participant stub for P1
+# ** Rule system traces create
+# Recovery system recreates participant stub for P2
+# ** Rule system traces create
+# Recovery system recreates participant stub for P3
+# ** Rule system traces create
+# Recovery system calls replay of PREPARED transaction
+# ** Rule system traces PREPARED replay invocation
+#
+# Coordinator sends commit to P1
+# P1 replies with committed
+# ** Rule system traces receipt of committed
+#
+# Coordinator sends commit to P2
+# P2 replies with committed
+# ** Rule system traces receipt of committed
+#
+# Coordinator sends commit to P3
+# P3 replies with committed
+# ** Rule system traces receipt of committed
+#
+# Coordinator clears log record and completes commit
+# ** Rule system detects completed commit and kills JVM
+#
+# Use of this script
+#
+# The default way of exercising this test is to deploy the xtstest war
+# to a single AS and configure it to run the relevant XTS Service Test.
+# The web services and coordinator will be located with the client.
+# The number of participants can actually be 2, 3 or more. The
+# web service(s), client (i.e. the XTS Service Test which drives
+# the test) and coordinator service can optionally be located in separate
+# ASs. It is also possible to use just a single web service and have the
+# client register multiple participants. The coordinator AS should crash
+# when the client commits. At reboot the rest of the test should run
+# automatically and the server should be killed after a the heuristic
+# transaction is successfuly replayed.
+#
+# n.b. this test is not appropriate for the case where only one participant
+# is registered since the coordinator employs the one phase optimization,
+# omitting to log the activity. In this case the lone participant should be
+# automatically compensated at restart.
+#
+# This script needs to be passed to a TOAST agent in the JVM running
+# the coordinator service both at first boot and at reboot. Output will be
+# written to file testlog in the working directory of the AS. n.b. the rules in
+# this script only refer to code executed by the coordinator. If the client
+# (the selected XTS Service Test) or the web services are located in another
+# AS/JVM then the other JVM does not require a TOAST agent or script.
+#
+# XTS Service tests which can operate with this scenario can be selected for
+# execution at AS boot by the XTSServiceTestRunnerBean by setting system
+# property
+# org.jboss.jbossts.xts.servicetests.XTSServiceTestName
+# to the name of a class which will execute the test. This property must
+# be defined in the JVM running the AS to which the xtstest war is deployed
+# i.e. the client AS. n.b. if the client is colocated with the coordinator then
+# this property must be left undefined at AS reboot otherwise the client
+# will run again, starting a new TX which may interfere with recovery of the
+# crashed TX.
+#
+# Available tests include:
+#
+# Tests still to be written include:
+#
+# org.jboss.jbossts.xts.servicetests.test.ATMultiParticipantPrepareAndCommitTest
+# this test invokes a single service registering 3 coordinator completion participants
+# the location of service is defined by defining a system property:
+# org.jboss.jbossts.xts.servicetests.ServiceURL1
+# if this is not set the value used defaults to
+# http://localhost:8080/xtstest/xtsservicetest1
+#
+# org.jboss.jbossts.xts.servicetests.test.ATMultiServicePrepareAndCommitTest
+# this test invokes 3 separate services registering a coordinator completion participant with each service
+# the location of service is defined by defining by system properties:
+# org.jboss.jbossts.xts.servicetests.ServiceURL1
+# org.jboss.jbossts.xts.servicetests.ServiceURL2
+# org.jboss.jbossts.xts.servicetests.ServiceURL3
+# if these are not set the values used default to
+# http://localhost:8080/xtstest/xtsservicetest1
+# http://localhost:8080/xtstest/xtsservicetest2
+# http://localhost:8080/xtstest/xtsservicetest3
+#
+# If the client is run in a different AS to the coordinator then the client
+# AS needs to be pointed at the coordinator AS. The easiest
+# way to do this is to define the system property
+#
+# org.jboss.jbossts.xts11.coordinatorURL
+#
+# to something like
+#
+# http://foo.bar.org:8080/ws-c11/soap/ActivationCoordinator
+#
+# or alternatively to redefine one or more of the component properties
+#
+# org.jboss.jbossts.xts11.coordinator.host
+# org.jboss.jbossts.xts11.coordinator.port
+# org.jboss.jbossts.xts11.coordinator.path
+#
+# (you probably only need to reset the host component)
+#
+# Expected output
+#
+# After the first boot the JVM should exit leaving the following in file testlog
+#
+# prepared received for participant XXXXXX
+# prepared received for participant XXXXXX
+# prepared received for participant XXXXXX
+# JVM exit
+#
+# n.b. there should be at least one prepared message received for each participant
+# and in some cases there may be repeat messages
+#
+# After reboot the JVM should exit leaving output in the following format in file
+# testlog.
+#
+# prepared received for participant XXXXXX
+# prepared received for participant XXXXXX
+# prepared received for participant XXXXXX
+# JVM exit
+# committed received for participant XXXXXX
+# committed received for participant XXXXXX
+# committed received for participant XXXXXX
+#
+#######################################################################
+# This rule opens a file for the trace output during XTS startup
+# It will be opened for append at reboot so messages from both runs
+# will go to this file
+#
+RULE open trace file
+CLASS org.jboss.jbossts.XTSService
+METHOD start()
+BIND NOTHING
+IF TRUE
+DO openTrace("log", "testlog")
+ENDRULE
+
+#######################################################################
+## rules for first run of AS
+
+#######################################################################
+# This rule is triggered when a non-recovered coordinator engine
+# (CoordinatorEngine) is sent a commit message. It exits the JVM,
+# simulating a crash. The trigger location is on entry
+
+RULE kill JVM at participant completion close
+CLASS com.arjuna.wst11.messaging.engines.CoordinatorEngine
+METHOD close
+AT ENTRY
+BIND engine:CoordinatorEngine = $0,
+ recovered:boolean = engine.isRecovered(),
+ identifier:String = engine.getId()
+IF (NOT recovered)
+ AND
+ debug("close on non-recovered coordinator engine " + identifier)
+DO traceln("log", "JVM exit"),
+ debug("!!!killing JVM!!!"),
+ killJVM()
+ENDRULE
+
+#######################################################################
+# This rule is triggered when a non-recovered coordinator engine
+# (CoordinatorEngine) is requested to send a prepared message. It
+# traces the call.
+
+RULE trace coordinator completion close
+CLASS com.arjuna.wst11.messaging.engines.CoordinatorEngine
+METHOD prepare
+AFTER SYNCHRONIZE
+BIND engine:CoordinatorEngine = $0,
+ recovered:boolean = engine.isRecovered(),
+ identifier:String = engine.getId()
+IF NOT recovered
+DO debug("close on recovered coordinator engine " + identifier),
+ traceln("log", "close on recovered coordinator engine " + identifier)
+ENDRULE
+
+#######################################################################
+# This rule is triggered when a non-recovered coordinator engine
+# (CoordinatorEngine) receives a prepared message. It traces the call.
+
+RULE trace coordinator completion completed
+CLASS com.arjuna.wst11.messaging.engines.CoordinatorEngine
+METHOD prepared(NotificationType, AddressingProperties, ArjunaContext)
+AT ENTRY
+BIND engine:CoordinatorEngine = $0,
+ recovered:boolean = engine.isRecovered(),
+ identifier:String = engine.getId()
+IF NOT recovered
+DO debug("received completed message for coordinator engine " + identifier),
+ traceln("log", "received completed message for coordinator engine " + identifier)
+ENDRULE
+
+#######################################################################
+## rules for reboot run of AS
+
+#######################################################################
+# This rule is triggered when a coordinator engine (CoordinatorEngine)
+# is created from details located in the log record. It traces the
+# create operation. The trigger location is at entry but the rule
+# should only be triggered after calling the super constructor
+RULE trace participant completion engine create
+CLASS com.arjuna.wst11.messaging.engines.CoordinatorEngine
+METHOD <init>(String, boolean, W3CEndpointReference, State, boolean)
+AT ENTRY
+BIND identifier = $1,
+ recovered=$4
+IF recovered
+DO debug("created recovered coordinator engine " + identifier),
+ trace("log", "created recovered coordinator engine " + identifier)
+ENDRULE
+
+#######################################################################
+# This rule is triggered when a coordinator engine
+# (CoordinatorCompletionCoordinatorEngine) is created from details
+# located in the log record. It traces the create operation
+# The trigger location is at entry but the rule should only be triggered
+# after calling the super constructor
+RULE trace coordinator completion engine create
+CLASS com.arjuna.wst11.messaging.engines.CoordinatorEngine
+METHOD <init>(String, boolean, W3CEndpointReference, State, boolean)
+AT ENTRY
+BIND identifier = $1,
+ recovered=$4
+IF recovered
+DO debug("created recovered coordinator engine " + identifier),
+ trace("log", "created recovered coordinator engine " + identifier)
+ENDRULE
+
+#######################################################################
+# This rule is triggered when a recovered coordinator engine
+# (CoordinatorEngine) is requested to send a close message. This
+# happens during replay of a prepared TX from
+# the log. It traces the call.
+
+RULE trace participant completion close
+CLASS com.arjuna.wst11.messaging.engines.ParticipantCompletionCoordinatorEngine
+METHOD close
+AFTER SYNCHRONIZE
+BIND engine:CoordinatorEngine = $0,
+ recovered:boolean = engine.isRecovered(),
+ identifier:String = engine.getId()
+IF recovered
+DO debug("close on recovered participant completion participant engine " + identifier),
+ traceln("log", "close on recovered participant completion participant engine " + identifier)
+ENDRULE
+
+#######################################################################
+# This rule is triggered when a recovered coordinator completion
+# participant stub (CoordinatorCompletionCoordinatorEngine) is requested
+# to send a close message. This happens during replay of a prepared TX from
+# the log. It traces the call.
+
+RULE trace coordinator completion close
+CLASS com.arjuna.wst11.messaging.engines.CoordinatorCompletionCoordinatorEngine
+METHOD close
+AFTER SYNCHRONIZE
+BIND engine:CoordinatorEngine = $0,
+ recovered:boolean = engine.isRecovered(),
+ identifier:String = engine.getId()
+IF recovered
+DO debug("close on recovered coordinator completion participant engine " + identifier),
+ traceln("log", "close on recovered coordinator completion participant engine " + identifier)
+ENDRULE
+
+#######################################################################
+# This rule is triggered when the recovery system finds a PREPARED
+# activity in the log and reruns the phase 2 commit operation.
+# It prints a message which can be used to verify that the test has
+# progressed as expected.
+
+RULE trace prepared replay
+CLASS org.jboss.jbossts.xts.recovery.coordinator.ba.RecoverACCoordinator
+METHOD replayPhase2
+AT INVOKE phase2Commit
+BIND coordinator = $0,
+ uid : Uid = coordinator.identifier(),
+ status : int = coordinator.status()
+IF (status == com.arjuna.ats.arjuna.coordinator.ActionStatus.PREPARED)
+ OR
+ (status == com.arjuna.ats.arjuna.coordinator.ActionStatus.COMMITTING)
+DO debug("replaying close for prepared activity " + uid),
+ traceln("log", "replaying close for prepared activity " + uid)
+ENDRULE
+
+#######################################################################
+# This rule is triggered when the recovery system deletes the COMMITTED
+# activity from the log. It prints a message which can be used to
+# verify that the test has completed. As a convenience itt also kills
+# the JVM to halt the test.
+
+RULE trace remove committed state
+CLASS com.arjuna.ats.arjuna.coordinator.BasicAction
+METHOD updateState
+AFTER CALL remove_committed
+BIND action : BasicAction = $0,
+ uid = action.get_uid()
+IF TRUE
+DO traceln("log", "removed committed activity " + uid),
+ debug("removed committed transaction " + uid),
+ debug("!!!killing JVM!!!"),
+ killJVM()
+ENDRULE
Property changes on: labs/jbosstm/trunk/XTS/sar/tests/dd/scripts/ATCrashDuringCommit.txt
___________________________________________________________________
Name: svn:mergeinfo
+
More information about the jboss-svn-commits
mailing list