[jboss-svn-commits] JBL Code SVN: r20656 - in labs/jbosstm/trunk/XTS/WS-T: dev/src10/com/arjuna/wst/messaging and 8 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Jun 20 09:52:48 EDT 2008


Author: adinn
Date: 2008-06-20 09:52:48 -0400 (Fri, 20 Jun 2008)
New Revision: 20656

Modified:
   labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java
   labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java
   labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java
   labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java
Log:
improved top-down recovery for WSAT coordinators so that participant stub remains continuously activated from first recovery until committed in order to avoid missing committed messages. fix for JBTM-346

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/webservices/wsat/processors/CoordinatorProcessor.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -68,12 +68,16 @@
     /**
      * Deactivate the coordinator.
      * @param coordinator The coordinator.
-     * @param leaveGhost true if a ghost activation entry should be left to indicate that the
-     * coordinator exists in a log entry and will be recovered at some later date
      */
-    public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator, boolean leaveGhost) ;
+    public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator) ;
     
     /**
+     * Fetch the coordinator with a given identifier.
+     * @param identifier The identifier.
+     */
+    public abstract CoordinatorInboundEvents getCoordinator(final String identifier) ;
+
+    /**
      * Aborted.
      * @param aborted The aborted notification.
      * @param addressingContext The addressing context.

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/CoordinatorProcessorImpl.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -24,7 +24,6 @@
 import com.arjuna.webservices.SoapFault10;
 import com.arjuna.webservices.SoapFaultType;
 import com.arjuna.webservices.base.processors.ActivatedObjectProcessor;
-import com.arjuna.webservices.base.processors.ReactivatedObjectProcessor;
 import com.arjuna.webservices.logging.WSTLogger;
 import com.arjuna.webservices.wsaddr.AddressingContext;
 import com.arjuna.webservices.wsaddr.AttributedURIType;
@@ -47,7 +46,7 @@
     /**
      * The activated object processor.
      */
-    private final ReactivatedObjectProcessor activatedObjectProcessor = new ReactivatedObjectProcessor() ;
+    private final ActivatedObjectProcessor activatedObjectProcessor = new ActivatedObjectProcessor() ;
 
     /**
      * Activate the coordinator.
@@ -62,34 +61,30 @@
     /**
      * Deactivate the coordinator.
      * @param coordinator The coordinator.
-     * @param leaveGhost true if a ghost activation entry should be left to indicate that the
-     * coordinator exists in a log entry and will be recovered at some later date
      */
-    public void deactivateCoordinator(final CoordinatorInboundEvents coordinator, boolean leaveGhost)
-    {
-        activatedObjectProcessor.deactivateObject(coordinator, leaveGhost) ;
+    public void deactivateCoordinator(CoordinatorInboundEvents coordinator) {
+        activatedObjectProcessor.deactivateObject(coordinator);
     }
 
     /**
      * Get the coordinator with the specified identifier.
-     * @param instanceIdentifier The coordinator identifier.
+     * @param identifier The coordinator identifier as a String.
      * @return The coordinator or null if not known.
      */
-    private CoordinatorInboundEvents getCoordinator(final InstanceIdentifier instanceIdentifier)
+
+    public CoordinatorInboundEvents getCoordinator(final String identifier)
     {
-        final String identifier = (instanceIdentifier != null ? instanceIdentifier.getInstanceIdentifier() : null) ;
         return (CoordinatorInboundEvents)activatedObjectProcessor.getObject(identifier) ;
     }
-    
     /**
-     * Tests if there is a ghost entry with the specified identifier.
-     * @param instanceIdentifier The coordinator identifier.
-     * @return true if there is a ghost entry.
+     * Get the coordinator with the specified identifier.
+     * @param instanceIdentifier The coordinator identifier as an Instanceidentifier.
+     * @return The coordinator or null if not known.
      */
-    private boolean getGhostCoordinator(final InstanceIdentifier instanceIdentifier)
+    private CoordinatorInboundEvents getCoordinator(final InstanceIdentifier instanceIdentifier)
     {
         final String identifier = (instanceIdentifier != null ? instanceIdentifier.getInstanceIdentifier() : null) ;
-        return activatedObjectProcessor.getGhost(identifier) ;
+        return getCoordinator(identifier);
     }
 
     /**
@@ -135,7 +130,6 @@
      * 
      * @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_1 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_1] - Unexpected exception thrown from committed:
      * @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2] - Committed called on unknown coordinator: {0}
-     * @message com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_3 [com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_3] - Ignoring committed called on unidentified coordinator until recovery pass is complete: {0}
      */
     public void committed(final NotificationType committed, final AddressingContext addressingContext,
         final ArjunaContext arjunaContext)
@@ -159,11 +153,7 @@
         }
         else if (WSTLogger.arjLoggerI18N.isWarnEnabled())
         {
-            if (!getGhostCoordinator(instanceIdentifier)) {
-                WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
-            } else {
-                WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_3", new Object[] {instanceIdentifier}) ;
-            }
+            WSTLogger.arjLoggerI18N.warn("com.arjuna.wst.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
         }
     }
     
@@ -197,7 +187,7 @@
                 }
             }
         }
-        else if (!getGhostCoordinator(instanceIdentifier))
+        else if (areRecoveryLogEntriesAccountedFor())
         {
             if (WSTLogger.arjLoggerI18N.isWarnEnabled())
             {
@@ -216,7 +206,7 @@
         }
         else
         {
-            // there may be a participant stub waitinng to be recovered from the log so drop the
+            // there may be a participant stub waiting to be recovered from the log so drop the
             // message, forcing the caller to retry
 
             if (WSTLogger.arjLoggerI18N.isWarnEnabled())
@@ -411,4 +401,36 @@
             }
         }
     }
+
+    /**
+     * Notifies that all coordinator entries in the recovery log have been accounted for.
+     */
+
+    public static void setRecoveryLogEntriesAccountedFor()
+    {
+        recoveryLogEntriesAccountedFor = true;
+    }
+
+    /**
+     * Tests if there may be unknown coordinator entries in the recovery log.
+     *
+     * @return false if there may be unknown coordinator entries in the recovery log.
+     */
+
+    private static boolean areRecoveryLogEntriesAccountedFor()
+    {
+        return recoveryLogEntriesAccountedFor;
+    }
+
+    /**
+     * False if there may be unknown coordinator entries in the recovery log otherwise true.
+     * This field defaults to false at boot. It is reset to true when the first log scan has
+     * completed from which point onwards there will always be a record in the activation
+     * processor for each entry in the recovery log.
+     *
+     * @return False if there may be unknown coordinator entries in the recovery log otherwise
+     * true.
+     */
+
+    private static boolean recoveryLogEntriesAccountedFor = false;
 }

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/messaging/engines/CoordinatorEngine.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -374,23 +374,22 @@
         {
             if (state != State.STATE_COMMITTING)
             {
-                return state ;
-            }
+                // if this is a recovered participant then forget will not have
+                // deactivated the entry so that this (recovery) thread can
+                // detect it and update its log entry. so we need to deactivate
+                // the entry here.
 
-            if (timerTask != null)
-            {
-        	timerTask.cancel() ;
+                if (recovered) {
+                    CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
+                }
 
-                timerTask = null;
+                return state ;
             }
-            
-            // no answer means this entry will be saved in the log and the commit retried
-            // we remove this engine but leave a ghost to make sure we drop incoming
-            // prepared or completed messages from the client until we reinsert a new engine
-            // when recovery kicks in. we leave this engine in state COMMITTING so we resend
-            // the commit at the next recovery stage
 
-            CoordinatorProcessor.getProcessor().deactivateCoordinator(this, true) ;
+            // the participant is still uncommitted so it must be rewritten to the log.
+            // it remains activated in case a committed message comes in between now and
+            // the next scan. the recovery code will detect this active participant when
+            // rescanning the log and use it instead of recreating a new one.
 
             return State.STATE_COMMITTING;
         }
@@ -544,10 +543,22 @@
      */
     private void forget()
     {
-        // we don't leave a ghost entry here
-        CoordinatorProcessor.getProcessor().deactivateCoordinator(this, false) ;
+        // first, change state to null to indicate that the participant has completed.
 
         changeState(null) ;
+                   
+        // participants which have not been recovered from the log can be deactivated now.
+
+        // participants which have been recovered are left for the recovery thread to deactivate.
+        // this is because the recovery thread may have timed out waiting for a response to
+        // the commit message and gone on to complete its scan and suspend. the next scan
+        // will detect this activated participant and note that it has completed. if a crash
+        // happens in between the recovery thread can safely recreate and reactivate the
+        // participant and resend the commit since the commit/committed exchange is idempotent.
+
+        if (!recovered) {
+            CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
+        }
     }
     
     /**

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/ParticipantStub.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -51,6 +51,7 @@
 import com.arjuna.wst.Vote;
 import com.arjuna.wst.WrongStateException;
 import com.arjuna.wst.messaging.engines.CoordinatorEngine;
+import com.arjuna.wst.messaging.CoordinatorProcessorImpl;
 
 public class ParticipantStub implements Participant, PersistableParticipant
 {
@@ -226,8 +227,13 @@
             final XMLStreamReader reader = SoapUtils.getXMLStreamReader(new StringReader(eprValue)) ;
             StreamHelper.checkNextStartTag(reader, QNAME_TWO_PC_PARTICIPANT) ;
             final EndpointReferenceType endpointReferenceType = new EndpointReferenceType(reader) ;
-
-            coordinator = new CoordinatorEngine(id, durable, endpointReferenceType, true, State.STATE_PREPARED_SUCCESS) ;
+            // if we already have a coordinator from a previous recovery scan then reuse it
+            // with luck it will have been committed between the last scan and this one
+            coordinator = (CoordinatorEngine)CoordinatorProcessorImpl.getProcessor().getCoordinator(id);
+            if (coordinator == null) {
+                // no entry found so recreate one which is at the prepared stage
+                coordinator = new CoordinatorEngine(id, durable, endpointReferenceType, true, State.STATE_PREPARED_SUCCESS) ;
+            }
             return true ;
         }
         catch (final Throwable th)

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/webservices11/wsat/processors/CoordinatorProcessor.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -50,12 +50,16 @@
     /**
      * Deactivate the coordinator.
      * @param coordinator The coordinator.
-     * @param leaveGhost true if a ghost activation entry should be left to indicate that the
-     * coordinator exists in a log entry and will be recovered at some later date
      */
-    public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator, boolean leaveGhost) ;
+    public abstract void deactivateCoordinator(final CoordinatorInboundEvents coordinator) ;
 
     /**
+     * Fetch the coordinator with a given identifier.
+     * @param identifier The identifier.
+     */
+    public abstract CoordinatorInboundEvents getCoordinator(final String identifier) ;
+
+    /**
      * Aborted.
      * @param aborted The aborted notification.
      * @param addressingProperties The addressing context.

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/CoordinatorProcessorImpl.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -3,7 +3,6 @@
 import com.arjuna.webservices.SoapFault;
 import com.arjuna.webservices.SoapFaultType;
 import com.arjuna.webservices.base.processors.ActivatedObjectProcessor;
-import com.arjuna.webservices.base.processors.ReactivatedObjectProcessor;
 import com.arjuna.webservices.logging.WSTLogger;
 import com.arjuna.webservices11.wsaddr.AddressingHelper;
 import com.arjuna.webservices11.wsarj.ArjunaContext;
@@ -29,7 +28,7 @@
     /**
      * The activated object processor.
      */
-    private final ReactivatedObjectProcessor activatedObjectProcessor = new ReactivatedObjectProcessor() ;
+    private final ActivatedObjectProcessor activatedObjectProcessor = new ActivatedObjectProcessor() ;
 
     /**
      * Activate the coordinator.
@@ -45,33 +44,30 @@
      * Deactivate a coordinator recovered from the log.
      *
      * @param coordinator The coordinator.
-     * @param leaveGhost true if a ghost activation entry should be left to indicate that the
-     * coordinator exists in a log entry and will be recovered at some later date
      */
-    public void deactivateCoordinator(CoordinatorInboundEvents coordinator, boolean leaveGhost) {
-        activatedObjectProcessor.deactivateObject(coordinator,  leaveGhost);
+    public void deactivateCoordinator(CoordinatorInboundEvents coordinator) {
+        activatedObjectProcessor.deactivateObject(coordinator);
     }
 
     /**
      * Get the coordinator with the specified identifier.
-     * @param instanceIdentifier The coordinator identifier.
+     * @param identifier The coordinator identifier as a String.
      * @return The coordinator or null if not known.
      */
-    private CoordinatorInboundEvents getCoordinator(final InstanceIdentifier instanceIdentifier)
+
+    public CoordinatorInboundEvents getCoordinator(final String identifier)
     {
-        final String identifier = (instanceIdentifier != null ? instanceIdentifier.getInstanceIdentifier() : null) ;
         return (CoordinatorInboundEvents)activatedObjectProcessor.getObject(identifier) ;
     }
-
     /**
-     * Tests if there is a ghost entry with the specified identifier.
-     * @param instanceIdentifier The coordinator identifier.
-     * @return true if there is a ghost entry.
+     * Get the coordinator with the specified identifier.
+     * @param instanceIdentifier The coordinator identifier as an Instanceidentifier.
+     * @return The coordinator or null if not known.
      */
-    private boolean getGhostCoordinator(final InstanceIdentifier instanceIdentifier)
+    private CoordinatorInboundEvents getCoordinator(final InstanceIdentifier instanceIdentifier)
     {
         final String identifier = (instanceIdentifier != null ? instanceIdentifier.getInstanceIdentifier() : null) ;
-        return activatedObjectProcessor.getGhost(identifier) ;
+        return getCoordinator(identifier);
     }
 
     /**
@@ -117,7 +113,6 @@
      *
      * @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_1 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_1] - Unexpected exception thrown from committed:
      * @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2] - Committed called on unknown coordinator: {0}
-     * @message com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_3 [com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_3] - Ignoring committed called on unidentified coordinator until recovery pass is complete: {0}
      */
     public void committed(final Notification committed, final AddressingProperties addressingProperties,
         final ArjunaContext arjunaContext)
@@ -141,11 +136,7 @@
         }
         else if (WSTLogger.arjLoggerI18N.isWarnEnabled())
         {
-            if (!getGhostCoordinator(instanceIdentifier)) {
-                WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
-            } else {
-                WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_3", new Object[] {instanceIdentifier}) ;
-            }
+            WSTLogger.arjLoggerI18N.warn("com.arjuna.wst11.messaging.CoordinatorProcessorImpl.committed_2", new Object[] {instanceIdentifier}) ;
         }
     }
 
@@ -179,7 +170,7 @@
                 }
             }
         }
-        else if (!getGhostCoordinator(instanceIdentifier))
+        else if (areRecoveryLogEntriesAccountedFor())
         {
             if (WSTLogger.arjLoggerI18N.isWarnEnabled())
             {
@@ -198,7 +189,7 @@
         }
         else
         {
-            // there may be a participant stub waitinng to be recovered from the log so drop the
+            // there may be a participant stub waiting to be recovered from the log so drop the
             // message, forcing the caller to retry
 
             if (WSTLogger.arjLoggerI18N.isWarnEnabled())
@@ -334,4 +325,36 @@
             }
         }
     }
+
+    /**
+     * Notifies that all coordinator entries in the recovery log have been accounted for.
+     */
+
+    public static void setRecoveryLogEntriesAccountedFor()
+    {
+        recoveryLogEntriesAccountedFor = true;
+    }
+
+    /**
+     * Tests if there may be unknown coordinator entries in the recovery log.
+     *
+     * @return false if there may be unknown coordinator entries in the recovery log.
+     */
+
+    private static boolean areRecoveryLogEntriesAccountedFor()
+    {
+        return recoveryLogEntriesAccountedFor;
+    }
+
+    /**
+     * False if there may be unknown coordinator entries in the recovery log otherwise true.
+     * This field defaults to false at boot. It is reset to true when the first log scan has
+     * completed from which point onwards there will always be a record in the activation
+     * processor for each entry in the recovery log.
+     *
+     * @return False if there may be unknown coordinator entries in the recovery log otherwise
+     * true.
+     */
+
+    private static boolean recoveryLogEntriesAccountedFor = false;
 }

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/messaging/engines/CoordinatorEngine.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -316,30 +316,23 @@
         {
             if (state != State.STATE_COMMITTING)
             {
-                return state ;
-            }
+                // if this is a recovered participant then forget will not have
+                // deactivated the entry so that this (recovery) thread can
+                // detect it and update its log entry. so we need to deactivate
+                // the entry here.
 
-            if (timerTask != null)
-            {
-                timerTask.cancel() ;
+                if (recovered) {
+                    CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
+                }
 
-                // this deals with a race to kill the timer task before it runs
-                // it may actually have gone off but not been able to call
-                // the method to clear this field before we entered this
-                // synchronized block. setting the field ot null here notifies
-                // the called method that it has been cancelled.
-
-                timerTask = null;
+                return state ;
             }
 
-            // no answer means this entry will be saved in the log and the commit retried
-            // we remove this engine but leave a ghost to make sure we drop incoming
-            // prepared or completed messages from the client until we reinsert a new engine
-            // when recovery kicks in. we leave this engine in state COMMITTING so we resend
-            // the commit at the next recovery stage
+            // the participant is still uncommitted so it must be rewritten to the log.
+            // it remains activated in case a committed message comes in between now and
+            // the next scan. the recovery code will detect this active participant when
+            // rescanning the log and use it instead of recreating a new one.
 
-            CoordinatorProcessor.getProcessor().deactivateCoordinator(this, true) ;
-
             return State.STATE_COMMITTING;
         }
     }
@@ -492,10 +485,22 @@
      */
     private void forget()
     {
-        // we don't leave a ghost entry here
-        CoordinatorProcessor.getProcessor().deactivateCoordinator(this, false) ;
+        // first, change state to null to indicate that the participant has completed.
 
         changeState(null) ;
+
+        // participants which have not been recovered from the log can be deactivated now.
+
+        // participants which have been recovered are left for the recovery thread to deactivate.
+        // this is because the recovery thread may have timed out waiting for a response to
+        // the commit message and gone on to complete its scan and suspend. the next scan
+        // will detect this activated participant and note that it has completed. if a crash
+        // happens in between the recovery thread can safely recreate and reactivate the
+        // participant and resend the commit since the commit/committed exchange is idempotent.
+
+        if (!recovered) {
+            CoordinatorProcessor.getProcessor().deactivateCoordinator(this) ;
+        }
     }
 
     /**

Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/ParticipantStub.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -3,6 +3,7 @@
 import com.arjuna.wst.*;
 import com.arjuna.wst.stub.SystemCommunicationException;
 import com.arjuna.wst11.messaging.engines.CoordinatorEngine;
+import com.arjuna.wst11.messaging.CoordinatorProcessorImpl;
 import com.arjuna.webservices11.wsat.State;
 import com.arjuna.webservices11.wsat.processors.CoordinatorProcessor;
 import com.arjuna.webservices11.util.StreamHelper;
@@ -198,7 +199,13 @@
             String eprefText = reader.getElementText();
             StreamSource source = new StreamSource(new StringReader(eprefText));
             final W3CEndpointReference endpointReference = new W3CEndpointReference(source);
-            coordinator = new CoordinatorEngine(id, durable, endpointReference, true, State.STATE_PREPARED_SUCCESS) ;
+            // if we already have a coordinator from a previous recovery scan then reuse it
+            // with luck it will have been committed between the last scan and this one
+            coordinator = (CoordinatorEngine)CoordinatorProcessorImpl.getProcessor().getCoordinator(id);
+            if (coordinator == null) {
+                // no entry found so recreate one which is at the prepared stage
+                coordinator = new CoordinatorEngine(id, durable, endpointReference, true, State.STATE_PREPARED_SUCCESS) ;
+            }
             return true ;
         }
         catch (final Throwable th)

Modified: labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst/tests/junit/TestCoordinatorProcessor.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -79,10 +79,19 @@
      *
      * @param coordinator The participant.
      */
-    public void deactivateCoordinator(CoordinatorInboundEvents coordinator, boolean leaveGhost) {
+    public void deactivateCoordinator(CoordinatorInboundEvents coordinator) {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    /**
+     * Fetch the coordinator with a given identifier.
+     *
+     * @param identifier The identifier.
+     */
+    public CoordinatorInboundEvents getCoordinator(String identifier) {
+        return null;
+    }
+
     public void aborted(NotificationType aborted,
             AddressingContext addressingContext, ArjunaContext arjunaContext)
     {

Modified: labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java	2008-06-20 13:46:05 UTC (rev 20655)
+++ labs/jbosstm/trunk/XTS/WS-T/tests/src/com/arjuna/wst11/tests/junit/TestCoordinatorProcessor.java	2008-06-20 13:52:48 UTC (rev 20656)
@@ -81,10 +81,19 @@
      *
      * @param coordinator The participant.
      */
-    public void deactivateCoordinator(CoordinatorInboundEvents coordinator, boolean leaveGhost) {
+    public void deactivateCoordinator(CoordinatorInboundEvents coordinator) {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    /**
+     * Fetch the coordinator with a given identifier.
+     *
+     * @param identifier The identifier.
+     */
+    public CoordinatorInboundEvents getCoordinator(String identifier) {
+        return null;
+    }
+
     public void aborted(Notification aborted,
             AddressingProperties addressingProperties, ArjunaContext arjunaContext)
     {




More information about the jboss-svn-commits mailing list