[jboss-svn-commits] JBL Code SVN: r20645 - in labs/jbosstm/trunk/XTS: WS-C/dev/src/com/arjuna/webservices/base/processors and 12 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Jun 19 12:24:37 EDT 2008
Author: adinn
Date: 2008-06-19 12:24:37 -0400 (Thu, 19 Jun 2008)
New Revision: 20645
Added:
labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ReactivatedObjectProcessor.java
Modified:
labs/jbosstm/trunk/XTS/WS-C/build.xml
labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ActivatedObjectProcessor.java
labs/jbosstm/trunk/XTS/WS-T/build.xml
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java
labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java
labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java
Log:
Implemented top-down recovery for WSAT coordinators from the coordinator side for both WSAT 1.0 and 1.1. i.e. if the WSAT coordinator crashes and restarts it will recover and commit previosuly prepared transactions (assuming the participants are still running). fix for JBTM-346
Modified: labs/jbosstm/trunk/XTS/WS-C/build.xml
===================================================================
--- labs/jbosstm/trunk/XTS/WS-C/build.xml 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-C/build.xml 2008-06-19 16:24:37 UTC (rev 20645)
@@ -328,9 +328,7 @@
<!--
<fileset dir="${com.arjuna.jta.install.ext}" includes="${jta.ext.jars}"/>
-->
- <!--
<fileset dir="${com.arjuna.xts.ext}" includes="${tests.libs}"/>
- -->
<!--
<fileset dir="${com.arjuna.jta.install.lib}" includes="${jta.lib.jars}"/>
<fileset dir="${build.dev.lib.dir}" includes="ws-c10.jar ws-c.jar"/>
Modified: labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ActivatedObjectProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ActivatedObjectProcessor.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ActivatedObjectProcessor.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -26,16 +26,16 @@
/**
* This class manages the association between an object and its identifier.
*/
-public final class ActivatedObjectProcessor
+public class ActivatedObjectProcessor
{
/**
* The identifier to object map.
*/
- private Map objectMap = new HashMap() ;
+ protected Map objectMap = new HashMap() ;
/**
* The object to identifier map.
*/
- private Map identifierMap = new HashMap() ;
+ protected Map identifierMap = new HashMap() ;
/**
* Activate the object.
Added: labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ReactivatedObjectProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ReactivatedObjectProcessor.java (rev 0)
+++ labs/jbosstm/trunk/XTS/WS-C/dev/src/com/arjuna/webservices/base/processors/ReactivatedObjectProcessor.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -0,0 +1,126 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a full listing
+ * of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU General Public License, v. 2.0.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License,
+ * v. 2.0 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2008,
+ * @author JBoss Inc.
+ */
+package com.arjuna.webservices.base.processors;
+
+/**
+ * A specialization of ActivatedObjectProcessor which allows for ghost entries to
+ * be left in the table after deletion. A ghost entry cannot be retrieved by a normal
+ * getObject(id) which will return null, indicating that no object with the supplied id
+ * exists. However, the ghost's presence can be detected using getGhost(id).</p>
+ *
+ * Ghost entries are used to identify objects which have failed to be terminated due to an
+ * unavailable participant or coordinator and so are still present in an unprocessed log record.
+ * When recovery processing recreates a participant the recovered instance replaces the
+ * ghost entry, ensuring that sbsequent messages update the participant whose recovery is
+ * being driven by the coordinator.
+ */
+
+public class ReactivatedObjectProcessor extends ActivatedObjectProcessor {
+
+ /**
+ * a private object used to identify a ghost entry
+ */
+
+ static final private Object tombstone = new Object();
+
+ /**
+ * Activate the object.
+ *
+ * @param object The object.
+ * @param identifier The identifier.
+ */
+ public synchronized void activateObject(Object object, String identifier) {
+ super.activateObject(object, identifier);
+ }
+
+ /**
+ * Deactivate the object.
+ *
+ * @param object The object.
+ */
+ public synchronized void deactivateObject(Object object) {
+ deactivateObject(object, false);
+ }
+
+ /**
+ * Deactivate the object.
+ *
+ * @param object The object.
+ */
+ public synchronized void deactivateObject(Object object, boolean leaveGhost) {
+ if (leaveGhost) {
+ final String identifier = (String)identifierMap.get(object);
+ super.deactivateObject(object);
+ objectMap.put(identifier, tombstone);
+ } else {
+ super.deactivateObject(object);
+ }
+ }
+
+ /**
+ * Get the object with the specified identifier.
+ *
+ * @param identifier The identifier.
+ * @return The participant or null if not known.
+ */
+ public synchronized Object getObject(String identifier) {
+ final Object object = super.getObject(identifier);
+
+ if (object == tombstone) {
+ return null;
+ }
+
+ return object;
+ }
+
+ /**
+ * check if there is a ghost entry for this object
+ *
+ * @param identifier
+ * @return true iff there is a ghost entry for this object
+ */
+ public synchronized boolean getGhost(String identifier)
+ {
+ if (reactivationProcessingStarted) {
+ final Object object = super.getObject(identifier);
+ return (object == tombstone);
+ } else {
+ // until we have been notified of at least one complete recovery scan pass we have
+ // to assume that any identifier may have an entry in the log so we return true
+ return true;
+ }
+ }
+
+ /**
+ * a global flag which is false at boot and is set to true once a recovery log scan for XTS
+ * data has completed
+ */
+ static boolean reactivationProcessingStarted = false;
+
+ /**
+ * notify completion of a recovery log scan for XTS data
+ */
+
+ static public void setReactivationProcessingStarted()
+ {
+ reactivationProcessingStarted = true;
+ }
+}
Modified: labs/jbosstm/trunk/XTS/WS-T/build.xml
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/build.xml 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/build.xml 2008-06-19 16:24:37 UTC (rev 20645)
@@ -367,9 +367,7 @@
<!--
<fileset dir="${com.arjuna.jta.install.ext}" includes="${jta.ext.jars}"/>
-->
- <!--
<fileset dir="${com.arjuna.xts.ext}" includes="${tests.libs}"/>
- -->
<!--
<fileset dir="${com.arjuna.jta.install.lib}" includes="${jta.lib.jars}"/>
<fileset dir="${build.dev.lib.dir}" includes="ws-t.jar ws-t10.jar"/>
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -68,8 +68,10 @@
/**
* Deactivate the coordinator.
* @param coordinator The coordinator.
+ * @param leaveGhost true if a ghost activation entry should be left to indicate that the
+ * coordinator exists in a log entry and will be recovered at some later date
*/
- public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator) ;
+ public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator, boolean leaveGhost) ;
/**
* Aborted.
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -24,6 +24,7 @@
import com.arjuna.webservices.SoapFault10;
import com.arjuna.webservices.SoapFaultType;
import com.arjuna.webservices.base.processors.ActivatedObjectProcessor;
+import com.arjuna.webservices.base.processors.ReactivatedObjectProcessor;
import com.arjuna.webservices.logging.WSTLogger;
import com.arjuna.webservices.wsaddr.AddressingContext;
import com.arjuna.webservices.wsaddr.AttributedURIType;
@@ -46,7 +47,7 @@
/**
* The activated object processor.
*/
- private final ActivatedObjectProcessor activatedObjectProcessor = new ActivatedObjectProcessor() ;
+ private final ReactivatedObjectProcessor activatedObjectProcessor = new ReactivatedObjectProcessor() ;
/**
* Activate the coordinator.
@@ -61,12 +62,14 @@
/**
* Deactivate the coordinator.
* @param coordinator The coordinator.
+ * @param leaveGhost true if a ghost activation entry should be left to indicate that the
+ * coordinator exists in a log entry and will be recovered at some later date
*/
- public void deactivateCoordinator(final CoordinatorInboundEvents coordinator)
+ public void deactivateCoordinator(final CoordinatorInboundEvents coordinator, boolean leaveGhost)
{
- activatedObjectProcessor.deactivateObject(coordinator) ;
+ activatedObjectProcessor.deactivateObject(coordinator, leaveGhost) ;
}
-
+
/**
* Get the coordinator with the specified identifier.
* @param instanceIdentifier The coordinator identifier.
@@ -79,6 +82,17 @@
}
/**
+ * Tests if there is a ghost entry with the specified identifier.
+ * @param instanceIdentifier The coordinator identifier.
+ * @return true if there is a ghost entry.
+ */
+ private boolean getGhostCoordinator(final InstanceIdentifier instanceIdentifier)
+ {
+ final String identifier = (instanceIdentifier != null ? instanceIdentifier.getInstanceIdentifier() : null) ;
+ return activatedObjectProcessor.getGhost(identifier) ;
+ }
+
+ /**
* Aborted.
* @param aborted The aborted notification.
* @param addressingContext The addressing context.
@@ -121,6 +135,7 @@
*
* @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_1 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_1] - Unexpected exception thrown from committed:
* @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2] - Committed called on unknown coordinator: {0}
+ * @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_3 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_3] - Ignoring committed called on unidentified coordinator until recovery pass is complete: {0}
*/
public void committed(final NotificationType committed, final AddressingContext addressingContext,
final ArjunaContext arjunaContext)
@@ -144,7 +159,11 @@
}
else if (WSTLogger.arjLoggerI18N.isWarnEnabled())
{
- WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
+ if (!getGhostCoordinator(instanceIdentifier)) {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
+ } else {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_3", new Object[] {instanceIdentifier}) ;
+ }
}
}
@@ -156,6 +175,7 @@
*
* @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_1 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_1] - Unexpected exception thrown from prepared:
* @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_2 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_2] - Prepared called on unknown coordinator: {0}
+ * @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_3 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_3] - Ignoring prepared called on unidentified coordinator until recovery pass is complete: {0}
*/
public void prepared(final NotificationType prepared, final AddressingContext addressingContext,
final ArjunaContext arjunaContext)
@@ -177,13 +197,13 @@
}
}
}
- else
+ else if (!getGhostCoordinator(instanceIdentifier))
{
if (WSTLogger.arjLoggerI18N.isWarnEnabled())
{
- WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_2", new Object[] {instanceIdentifier}) ;
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_2", new Object[] {instanceIdentifier}) ;
}
-
+
final String identifierValue = instanceIdentifier.getInstanceIdentifier() ;
if ((identifierValue != null) && (identifierValue.length() > 0) && (identifierValue.charAt(0) == 'D'))
{
@@ -194,6 +214,16 @@
sendInvalidState(addressingContext, arjunaContext) ;
}
}
+ else
+ {
+ // there may be a participant stub waitinng to be recovered from the log so drop the
+ // message, forcing the caller to retry
+
+ if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.prepared_3", new Object[] {instanceIdentifier}) ;
+ }
+ }
}
/**
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -70,6 +70,10 @@
*/
private State state ;
/**
+ * The flag indicating that this coordinator has been recovered from the log.
+ */
+ private boolean recovered ;
+ /**
* The flag indicating a read only response.
*/
private boolean readOnly ;
@@ -86,7 +90,7 @@
*/
public CoordinatorEngine(final String id, final boolean durable, final EndpointReferenceType participant)
{
- this(id, durable, participant, State.STATE_ACTIVE) ;
+ this(id, durable, participant, false, State.STATE_ACTIVE) ;
}
/**
@@ -96,13 +100,15 @@
* @param participant The participant endpoint reference.
* @param state The initial state.
*/
- public CoordinatorEngine(final String id, final boolean durable, final EndpointReferenceType participant, final State state)
+ public CoordinatorEngine(final String id, final boolean durable, final EndpointReferenceType participant, boolean recovered, final State state)
{
this.id = id ;
this.instanceIdentifier = new InstanceIdentifier(id) ;
this.durable = durable ;
this.participant = participant ;
+ this.recovered = recovered;
this.state = state ;
+
CoordinatorProcessor.getProcessor().activateCoordinator(this, id) ;
}
@@ -316,17 +322,20 @@
sendPrepare() ;
}
- final State result = waitForState(State.STATE_PREPARING, TransportTimer.getTransportTimeout()) ;
- if (result != State.STATE_PREPARING)
- {
- return result ;
- }
-
+ waitForState(State.STATE_PREPARING, TransportTimer.getTransportTimeout()) ;
+
synchronized(this)
{
- if ((state == State.STATE_PREPARING) && (timerTask != null))
+ if (state != State.STATE_PREPARING)
{
+ return state ;
+ }
+
+ if (timerTask != null)
+ {
timerTask.cancel() ;
+
+ timerTask = null;
}
return state ;
}
@@ -359,19 +368,31 @@
sendCommit() ;
}
- final State result = waitForState(State.STATE_COMMITTING, TransportTimer.getTransportTimeout()) ;
- if (result != State.STATE_COMMITTING)
- {
- return result ;
- }
-
+ waitForState(State.STATE_COMMITTING, TransportTimer.getTransportTimeout()) ;
+
synchronized(this)
{
- if ((state == State.STATE_COMMITTING) && (timerTask != null))
+ if (state != State.STATE_COMMITTING)
{
+ return state ;
+ }
+
+ if (timerTask != null)
+ {
timerTask.cancel() ;
+
+ timerTask = null;
}
- return state ;
+
+ // no answer means this entry will be saved in the log and the commit retried
+ // we remove this engine but leave a ghost to make sure we drop incoming
+ // prepared or completed messages from the client until we reinsert a new engine
+ // when recovery kicks in. we leave this engine in state COMMITTING so we resend
+ // the commit at the next recovery stage
+
+ CoordinatorProcessor.getProcessor().deactivateCoordinator(this, true) ;
+
+ return State.STATE_COMMITTING;
}
}
@@ -417,11 +438,17 @@
* Preparing -> Preparing (resend Prepare)
* Committing -> Committing (resend Commit)
*/
- private void commsTimeout()
+ private void commsTimeout(TimerTask caller)
{
final State current ;
synchronized(this)
{
+ if (timerTask != caller) {
+ // the timer was cancelled but it went off before it could be cancelled
+
+ return;
+ }
+
current = state ;
}
@@ -517,8 +544,10 @@
*/
private void forget()
{
+ // we don't leave a ghost entry here
+ CoordinatorProcessor.getProcessor().deactivateCoordinator(this, false) ;
+
changeState(null) ;
- CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
}
/**
@@ -528,6 +557,24 @@
*/
private void sendPrepare()
{
+ TimerTask newTimerTask = createTimerTask();
+ synchronized (this) {
+ // cancel any existing timer task
+
+ if (timerTask != null) {
+ timerTask.cancel();
+ }
+
+ // install the new timer task. this signals our intention to post a prepare which may need
+ // rescheduling later but allows us to drop the lock on this while we are in the comms layer.
+ // our intention can be revised by another thread by reassigning the field to a new task
+ // or null
+
+ timerTask = newTimerTask;
+ }
+
+ // ok now try the prepare
+
try
{
ParticipantClient.getClient().sendPrepare(createContext(), instanceIdentifier) ;
@@ -540,7 +587,19 @@
}
}
- initiateTimer() ;
+ // reobtain the lock before deciding whether to schedule the timer
+
+ synchronized (this) {
+ if (timerTask == newTimerTask) {
+ // the timer task has not been cancelled so schedule it if appropriate
+ if (state == State.STATE_PREPARING) {
+ scheduleTimer(newTimerTask);
+ } else {
+ // no need to schedule it so get rid of it
+ timerTask = null;
+ }
+ }
+ }
}
/**
@@ -550,6 +609,24 @@
*/
private void sendCommit()
{
+ TimerTask newTimerTask = createTimerTask();
+ synchronized (this) {
+ // cancel any existing timer task
+
+ if (timerTask != null) {
+ timerTask.cancel();
+ }
+
+ // install the new timer task. this signals our intention to post a commit which may need
+ // rescheduling later but allows us to drop the lock on this while we are in the comms layer.
+ // our intention can be revised by another thread by reassigning the field to a new task
+ // or null
+
+ timerTask = newTimerTask;
+ }
+
+ // ok now try the commit
+
try
{
ParticipantClient.getClient().sendCommit(createContext(), instanceIdentifier) ;
@@ -562,7 +639,19 @@
}
}
- initiateTimer() ;
+ // reobtain the lock before deciding whether to schedule the timer
+
+ synchronized (this) {
+ if (timerTask == newTimerTask) {
+ // the timer task has not been cancelled so schedule it if appropriate
+ if (state == State.STATE_COMMITTING) {
+ scheduleTimer(newTimerTask);
+ } else {
+ // no need to schedule it so get rid of it
+ timerTask = null;
+ }
+ }
+ }
}
/**
@@ -617,6 +706,30 @@
}
/**
+ * create a timer task to handle a comms timeout
+ *
+ * @return the timer task
+ */
+ private TimerTask createTimerTask()
+ {
+ return new TimerTask() {
+ public void run() {
+ commsTimeout(this) ;
+ }
+ } ;
+ }
+
+ /**
+ * schedule a timer task to handle a commms timeout
+ * @param timerTask the timer task to be scheduled
+ */
+
+ private void scheduleTimer(TimerTask timerTask)
+ {
+ TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ }
+
+ /**
* Initiate the timer.
*/
private synchronized void initiateTimer()
@@ -629,7 +742,7 @@
{
timerTask = new TimerTask() {
public void run() {
- commsTimeout() ;
+ commsTimeout(this) ;
}
} ;
TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -60,7 +60,12 @@
public ParticipantStub(final String id, final boolean durable, final EndpointReferenceType twoPCParticipant)
throws Exception
{
- coordinator = new CoordinatorEngine(id, durable, twoPCParticipant) ;
+ // id will be supplied as null during recovery in which case we can delay creation
+ // of the coordinator until restore_state is called
+
+ if (id != null) {
+ coordinator = new CoordinatorEngine(id, durable, twoPCParticipant) ;
+ }
}
public Vote prepare()
@@ -222,7 +227,7 @@
StreamHelper.checkNextStartTag(reader, QNAME_TWO_PC_PARTICIPANT) ;
final EndpointReferenceType endpointReferenceType = new EndpointReferenceType(reader) ;
- coordinator = new CoordinatorEngine(id, durable, endpointReferenceType, State.STATE_PREPARED_SUCCESS) ;
+ coordinator = new CoordinatorEngine(id, durable, endpointReferenceType, true, State.STATE_PREPARED_SUCCESS) ;
return true ;
}
catch (final Throwable th)
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -50,8 +50,10 @@
/**
* Deactivate the coordinator.
* @param coordinator The coordinator.
+ * @param leaveGhost true if a ghost activation entry should be left to indicate that the
+ * coordinator exists in a log entry and will be recovered at some later date
*/
- public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator) ;
+ public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator, boolean leaveGhost) ;
/**
* Aborted.
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -3,6 +3,7 @@
import com.arjuna.webservices.SoapFault;
import com.arjuna.webservices.SoapFaultType;
import com.arjuna.webservices.base.processors.ActivatedObjectProcessor;
+import com.arjuna.webservices.base.processors.ReactivatedObjectProcessor;
import com.arjuna.webservices.logging.WSTLogger;
import com.arjuna.webservices11.wsaddr.AddressingHelper;
import com.arjuna.webservices11.wsarj.ArjunaContext;
@@ -28,7 +29,7 @@
/**
* The activated object processor.
*/
- private final ActivatedObjectProcessor activatedObjectProcessor = new ActivatedObjectProcessor() ;
+ private final ReactivatedObjectProcessor activatedObjectProcessor = new ReactivatedObjectProcessor() ;
/**
* Activate the coordinator.
@@ -41,12 +42,14 @@
}
/**
- * Deactivate the coordinator.
+ * Deactivate a coordinator recovered from the log.
+ *
* @param coordinator The coordinator.
+ * @param leaveGhost true if a ghost activation entry should be left to indicate that the
+ * coordinator exists in a log entry and will be recovered at some later date
*/
- public void deactivateCoordinator(final CoordinatorInboundEvents coordinator)
- {
- activatedObjectProcessor.deactivateObject(coordinator) ;
+ public void deactivateCoordinator(CoordinatorInboundEvents coordinator, boolean leaveGhost) {
+ activatedObjectProcessor.deactivateObject(coordinator, leaveGhost);
}
/**
@@ -61,6 +64,17 @@
}
/**
+ * Tests if there is a ghost entry with the specified identifier.
+ * @param instanceIdentifier The coordinator identifier.
+ * @return true if there is a ghost entry.
+ */
+ private boolean getGhostCoordinator(final InstanceIdentifier instanceIdentifier)
+ {
+ final String identifier = (instanceIdentifier != null ? instanceIdentifier.getInstanceIdentifier() : null) ;
+ return activatedObjectProcessor.getGhost(identifier) ;
+ }
+
+ /**
* Aborted.
* @param aborted The aborted notification.
* @param addressingProperties The addressing context.
@@ -103,6 +117,7 @@
*
* @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_1 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_1] - Unexpected exception thrown from committed:
* @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2] - Committed called on unknown coordinator: {0}
+ * @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_3 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_3] - Ignoring committed called on unidentified coordinator until recovery pass is complete: {0}
*/
public void committed(final Notification committed, final AddressingProperties addressingProperties,
final ArjunaContext arjunaContext)
@@ -126,7 +141,11 @@
}
else if (WSTLogger.arjLoggerI18N.isWarnEnabled())
{
- WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
+ if (!getGhostCoordinator(instanceIdentifier)) {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
+ } else {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_3", new Object[] {instanceIdentifier}) ;
+ }
}
}
@@ -138,6 +157,7 @@
*
* @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_1 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_1] - Unexpected exception thrown from prepared:
* @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_2 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_2] - Prepared called on unknown coordinator: {0}
+ * @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_3 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_3] - Ignoring prepared called on unidentified coordinator until recovery pass is complete: {0}
*/
public void prepared(final Notification prepared, final AddressingProperties addressingProperties,
final ArjunaContext arjunaContext)
@@ -159,7 +179,7 @@
}
}
}
- else
+ else if (!getGhostCoordinator(instanceIdentifier))
{
if (WSTLogger.arjLoggerI18N.isWarnEnabled())
{
@@ -176,6 +196,16 @@
sendInvalidState(addressingProperties, arjunaContext) ;
}
}
+ else
+ {
+ // there may be a participant stub waitinng to be recovered from the log so drop the
+ // message, forcing the caller to retry
+
+ if (WSTLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.prepared_3", new Object[] {instanceIdentifier}) ;
+ }
+ }
}
/**
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -5,13 +5,11 @@
import com.arjuna.webservices.logging.WSTLogger;
import com.arjuna.webservices.util.TransportTimer;
import com.arjuna.webservices11.SoapFault11;
-import com.arjuna.webservices11.ServiceRegistry;
import com.arjuna.webservices11.wsaddr.AddressingHelper;
import com.arjuna.webservices11.wsarj.ArjunaContext;
import com.arjuna.webservices11.wsarj.InstanceIdentifier;
import com.arjuna.webservices11.wsat.CoordinatorInboundEvents;
import com.arjuna.webservices11.wsat.State;
-import com.arjuna.webservices11.wsat.AtomicTransactionConstants;
import com.arjuna.webservices11.wsat.client.ParticipantClient;
import com.arjuna.webservices11.wsat.processors.CoordinatorProcessor;
import com.arjuna.webservices11.wscoor.CoordinationConstants;
@@ -19,10 +17,7 @@
import org.oasis_open.docs.ws_tx.wsat._2006._06.Notification;
import javax.xml.namespace.QName;
-import javax.xml.ws.addressing.AddressingBuilder;
import javax.xml.ws.addressing.AddressingProperties;
-import javax.xml.ws.addressing.AttributedURI;
-import javax.xml.ws.addressing.Relationship;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import java.util.TimerTask;
@@ -53,6 +48,10 @@
*/
private State state ;
/**
+ * The flag indicating that this coordinator has been recovered from the log.
+ */
+ private boolean recovered ;
+ /**
* The flag indicating a read only response.
*/
private boolean readOnly ;
@@ -69,7 +68,7 @@
*/
public CoordinatorEngine(final String id, final boolean durable, final W3CEndpointReference participant)
{
- this(id, durable, participant, State.STATE_ACTIVE) ;
+ this(id, durable, participant, false, State.STATE_ACTIVE) ;
}
/**
@@ -79,13 +78,15 @@
* @param participant The participant endpoint reference.
* @param state The initial state.
*/
- public CoordinatorEngine(final String id, final boolean durable, final W3CEndpointReference participant, final State state)
+ public CoordinatorEngine(final String id, final boolean durable, final W3CEndpointReference participant, boolean recovered, final State state)
{
this.id = id ;
this.instanceIdentifier = new InstanceIdentifier(id) ;
this.durable = durable ;
this.participant = participant ;
this.state = state ;
+ this.recovered = recovered;
+
CoordinatorProcessor.getProcessor().activateCoordinator(this, id) ;
}
@@ -263,17 +264,20 @@
sendPrepare() ;
}
- final State result = waitForState(State.STATE_PREPARING, TransportTimer.getTransportTimeout()) ;
- if (result != State.STATE_PREPARING)
- {
- return result ;
- }
+ waitForState(State.STATE_PREPARING, TransportTimer.getTransportTimeout()) ;
synchronized(this)
{
- if ((state == State.STATE_PREPARING) && (timerTask != null))
+ if (state != State.STATE_PREPARING)
{
+ return state ;
+ }
+
+ if (timerTask != null)
+ {
timerTask.cancel() ;
+
+ timerTask = null;
}
return state ;
}
@@ -306,19 +310,37 @@
sendCommit() ;
}
- final State result = waitForState(State.STATE_COMMITTING, TransportTimer.getTransportTimeout()) ;
- if (result != State.STATE_COMMITTING)
- {
- return result ;
- }
+ waitForState(State.STATE_COMMITTING, TransportTimer.getTransportTimeout()) ;
synchronized(this)
{
- if ((state == State.STATE_COMMITTING) && (timerTask != null))
+ if (state != State.STATE_COMMITTING)
{
- timerTask.cancel() ;
+ return state ;
}
- return state ;
+
+ if (timerTask != null)
+ {
+ timerTask.cancel() ;
+
+ // this deals with a race to kill the timer task before it runs
+ // it may actually have gone off but not been able to call
+ // the method to clear this field before we entered this
+ // synchronized block. setting the field ot null here notifies
+ // the called method that it has been cancelled.
+
+ timerTask = null;
+ }
+
+ // no answer means this entry will be saved in the log and the commit retried
+ // we remove this engine but leave a ghost to make sure we drop incoming
+ // prepared or completed messages from the client until we reinsert a new engine
+ // when recovery kicks in. we leave this engine in state COMMITTING so we resend
+ // the commit at the next recovery stage
+
+ CoordinatorProcessor.getProcessor().deactivateCoordinator(this, true) ;
+
+ return State.STATE_COMMITTING;
}
}
@@ -364,11 +386,17 @@
* Preparing -> Preparing (resend Prepare)
* Committing -> Committing (resend Commit)
*/
- private void commsTimeout()
+ private void commsTimeout(TimerTask caller)
{
final State current ;
synchronized(this)
{
+ if (timerTask != caller) {
+ // the timer was cancelled but it went off before it could be cancelled
+
+ return;
+ }
+
current = state ;
}
@@ -464,8 +492,10 @@
*/
private void forget()
{
+ // we don't leave a ghost entry here
+ CoordinatorProcessor.getProcessor().deactivateCoordinator(this, false) ;
+
changeState(null) ;
- CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
}
/**
@@ -475,6 +505,24 @@
*/
private void sendPrepare()
{
+ TimerTask newTimerTask = createTimerTask();
+ synchronized (this) {
+ // cancel any existing timer task
+
+ if (timerTask != null) {
+ timerTask.cancel();
+ }
+
+ // install the new timer task. this signals our intention to post a prepare which may need
+ // rescheduling later but allows us to drop the lock on this while we are in the comms layer.
+ // our intention can be revised by another thread by reassigning the field to a new task
+ // or null
+
+ timerTask = newTimerTask;
+ }
+
+ // ok now try the prepare
+
try
{
ParticipantClient.getClient().sendPrepare(participant, createContext(), instanceIdentifier) ;
@@ -487,7 +535,19 @@
}
}
- initiateTimer() ;
+ // reobtain the lock before deciding whether to schedule the timer
+
+ synchronized (this) {
+ if (timerTask == newTimerTask) {
+ // the timer task has not been cancelled so schedule it if appropriate
+ if (state == State.STATE_PREPARING) {
+ scheduleTimer(newTimerTask);
+ } else {
+ // no need to schedule it so get rid of it
+ timerTask = null;
+ }
+ }
+ }
}
/**
@@ -497,6 +557,24 @@
*/
private void sendCommit()
{
+ TimerTask newTimerTask = createTimerTask();
+ synchronized (this) {
+ // cancel any existing timer task
+
+ if (timerTask != null) {
+ timerTask.cancel();
+ }
+
+ // install the new timer task. this signals our intention to post a commit which may need
+ // rescheduling later but allows us to drop the lock on this while we are in the comms layer.
+ // our intention can be revised by another thread by reassigning the field to a new task
+ // or null
+
+ timerTask = newTimerTask;
+ }
+
+ // ok now try the commit
+
try
{
ParticipantClient.getClient().sendCommit(participant, createContext(), instanceIdentifier) ;
@@ -509,7 +587,19 @@
}
}
- initiateTimer() ;
+ // reobtain the lock before deciding whether to schedule the timer
+
+ synchronized (this) {
+ if (timerTask == newTimerTask) {
+ // the timer task has not been cancelled so schedule it if appropriate
+ if (state == State.STATE_COMMITTING) {
+ scheduleTimer(newTimerTask);
+ } else {
+ // no need to schedule it so get rid of it
+ timerTask = null;
+ }
+ }
+ }
}
/**
@@ -559,6 +649,30 @@
}
/**
+ * create a timer task to handle a comms timeout
+ *
+ * @return the timer task
+ */
+ private TimerTask createTimerTask()
+ {
+ return new TimerTask() {
+ public void run() {
+ commsTimeout(this) ;
+ }
+ } ;
+ }
+
+ /**
+ * schedule a timer task to handle a commms timeout
+ * @param timerTask the timer task to be scheduled
+ */
+
+ private void scheduleTimer(TimerTask timerTask)
+ {
+ TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
+ }
+
+ /**
* Initiate the timer.
*/
private synchronized void initiateTimer()
@@ -571,7 +685,7 @@
{
timerTask = new TimerTask() {
public void run() {
- commsTimeout() ;
+ commsTimeout(this) ;
}
} ;
TransportTimer.getTimer().schedule(timerTask, TransportTimer.getTransportPeriod()) ;
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -27,7 +27,12 @@
public ParticipantStub(final String id, final boolean durable, final W3CEndpointReference twoPCParticipant)
throws Exception
{
- coordinator = new CoordinatorEngine(id, durable, twoPCParticipant) ;
+ // id will be supplied as null during recovery in which case we can delay creation
+ // of the coordinator until restore_state is called
+
+ if (id != null) {
+ coordinator = new CoordinatorEngine(id, durable, twoPCParticipant) ;
+ }
}
public Vote prepare()
@@ -193,7 +198,7 @@
String eprefText = reader.getElementText();
StreamSource source = new StreamSource(new StringReader(eprefText));
final W3CEndpointReference endpointReference = new W3CEndpointReference(source);
- coordinator = new CoordinatorEngine(id, durable, endpointReference, State.STATE_PREPARED_SUCCESS) ;
+ coordinator = new CoordinatorEngine(id, durable, endpointReference, true, State.STATE_PREPARED_SUCCESS) ;
return true ;
}
catch (final Throwable th)
Modified: labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -79,7 +79,7 @@
*
* @param coordinator The participant.
*/
- public void deactivateCoordinator(CoordinatorInboundEvents coordinator) {
+ public void deactivateCoordinator(CoordinatorInboundEvents coordinator, boolean leaveGhost) {
//To change body of implemented methods use File | Settings | File Templates.
}
Modified: labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -81,7 +81,7 @@
*
* @param coordinator The participant.
*/
- public void deactivateCoordinator(CoordinatorInboundEvents coordinator) {
+ public void deactivateCoordinator(CoordinatorInboundEvents coordinator, boolean leaveGhost) {
//To change body of implemented methods use File | Settings | File Templates.
}
Modified: labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java 2008-06-19 16:22:08 UTC (rev 20644)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java 2008-06-19 16:24:37 UTC (rev 20645)
@@ -34,6 +34,8 @@
import com.arjuna.common.util.logging.VisibilityLevel;
import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.ACCoordinator;
+import com.arjuna.webservices.base.processors.ActivatedObjectProcessor;
+import com.arjuna.webservices.base.processors.ReactivatedObjectProcessor;
import java.util.Vector;
import java.util.Enumeration;
@@ -86,7 +88,6 @@
}
/**
- * called by the service shutdown code after the recovery module is removed from the recovery managers
* module list in order to allow the implementations list to be purged of this module's implementations
*/
public void uninstall()
@@ -138,6 +139,12 @@
}
processTransactionsStatus() ;
+
+ // ok we will have left a ghost record in the reactivated object table for any
+ // entries still sitting in the log so we can safely reject messages for unknown,
+ // non-ghost identifiers
+
+ ReactivatedObjectProcessor.setReactivationProcessingStarted();
}
protected ACCoordinatorRecoveryModule (String type)
More information about the jboss-svn-commits
mailing list