[jboss-svn-commits] JBL Code SVN: r18422 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Feb 8 13:23:32 EST 2008


Author: kevin.conner at jboss.com
Date: 2008-02-08 13:23:32 -0500 (Fri, 08 Feb 2008)
New Revision: 18422

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java
Log:
Notify JobExecutor at end of transaction: JBESB-1543

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java	2008-02-08 18:12:10 UTC (rev 18421)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java	2008-02-08 18:23:32 UTC (rev 18422)
@@ -22,8 +22,13 @@
 package org.jboss.soa.esb.services.jbpm.cmd;
 
 import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
 
+import javax.transaction.Synchronization;
+import javax.transaction.Transaction;
+
 import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.TransactionStrategy;
 import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
 import org.jbpm.command.SignalCommand;
@@ -36,6 +41,7 @@
 import org.jbpm.graph.exe.Token;
 import org.jbpm.instantiation.Delegation;
 import org.jbpm.job.ExecuteActionJob;
+import org.jbpm.job.executor.JobExecutor;
 import org.jbpm.msg.MessageService;
 import org.jbpm.svc.Services;
 
@@ -67,6 +73,10 @@
      * The name of the ESB asynchronous signal actor variable.
      */
     private static final String ESB_ASYNC_SIGNAL_ACTOR_VARIABLE_NAME = ESB_ASYNC_SIGNAL_VARIABLE_NAME + "ACTOR_" ;
+    /**
+     * Map of active synchronisations.
+     */
+    private static final ConcurrentHashMap<Transaction, Synchronization> SYNCHRONISATIONS = new ConcurrentHashMap<Transaction, Synchronization>() ;
     
     /**
      * Create an asynchronous signal job for the specified token and transition.
@@ -74,7 +84,7 @@
      * @param transitionName The transition to signal or null if the default transition is to be used.
      * @param actor The actor to use.
      */
-    static void createSignalJob(final Token token, final String transitionName, final String actor)
+    static void createSignalJob(final JbpmContext jbpmContext, final Token token, final String transitionName, final String actor)
     {
         final boolean isDebugEnabled = logger.isDebugEnabled() ;
         final long tokenId = token.getId() ;
@@ -89,9 +99,9 @@
         token.lock(ESB_ASYNC_SIGNAL_ACTION_NAME);
         
         final String transitionVariableName = ESB_ASYNC_SIGNAL_TRANSITION_VARIABLE_NAME + tokenId ;
-        setVariable(contextInstance, transitionVariableName, transitionName) ;
+        setVariable(contextInstance, token, transitionVariableName, transitionName) ;
         final String actorVariableName = ESB_ASYNC_SIGNAL_ACTOR_VARIABLE_NAME + tokenId ;
-        setVariable(contextInstance, actorVariableName, actor) ;
+        setVariable(contextInstance, token, actorVariableName, actor) ;
         final ExecuteActionJob signalJob = new ExecuteActionJob(token) ;
         signalJob.setAction(getAsyncSignalAction(token)) ;
         signalJob.setDueDate(new Date()) ;
@@ -107,23 +117,46 @@
         {
             logger.debug("Sent signal task to message service for token id " + tokenId + " from process instance " + processInstanceId) ;
         }
+        
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        try
+        {
+            if (transactionStrategy.isActive())
+            {
+                final Transaction transaction = (Transaction)transactionStrategy.getTransaction() ;
+                if ((transaction != null) && !SYNCHRONISATIONS.containsKey(transaction))
+                {
+                    final Synchronization synch = new JobNotifierSynchronisation(transaction, jbpmContext.getJbpmConfiguration().getJobExecutor()) ;
+                    transaction.registerSynchronization(synch) ;
+                    SYNCHRONISATIONS.put(transaction, synch) ;
+                }
+            }
+        }
+        catch (final Exception ex)
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug("Failed to register synchronization", ex) ;
+            }
+        }
     }
     
     /**
      * Set the context instance variable.
      * @param contextInstance The context instance.
+     * @param token The current token.
      * @param name The variable name
      * @param value The variable value
      */
-    private static void setVariable(final ContextInstance contextInstance, final String name, final String value)
+    private static void setVariable(final ContextInstance contextInstance, final Token token, final String name, final String value)
     {
         if (value != null)
         {
-            contextInstance.setVariable(name, value) ;
+            contextInstance.setVariableLocally(name, value, token) ;
         }
         else
         {
-            contextInstance.deleteVariable(name) ;
+            contextInstance.deleteVariable(name, token) ;
         }
     }
     
@@ -228,4 +261,51 @@
             }
         }
     }
+    
+    /**
+     * Synchronisation to notify the job executor.
+     * @author kevin
+     */
+    private static final class JobNotifierSynchronisation implements Synchronization
+    {
+        /**
+         * The associated transaction.
+         */
+        private Transaction transaction ;
+        /**
+         * The current job executor.
+         */
+        private final JobExecutor jobExecutor ;
+        
+        /**
+         * Create the notifier synchronisation.
+         * @param transaction The current transaction.
+         * @param jobExecutor The current job executor.
+         */
+        public JobNotifierSynchronisation(final Transaction transaction, final JobExecutor jobExecutor)
+        {
+            this.transaction = transaction ;
+            this.jobExecutor = jobExecutor ;
+        }
+        
+        /**
+         * The before completion notification.
+         */
+        public void beforeCompletion()
+        {
+        }
+        
+        /**
+         * The after completion notification.
+         * @param status The status of the transaction.
+         */
+        public void afterCompletion(final int status)
+        {
+            SYNCHRONISATIONS.remove(transaction) ;
+            synchronized(jobExecutor)
+            {
+                jobExecutor.notify() ;
+            }
+        }
+    }
 }

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java	2008-02-08 18:12:10 UTC (rev 18421)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java	2008-02-08 18:23:32 UTC (rev 18422)
@@ -125,7 +125,7 @@
                 contextInstance.addVariables(variables);
             }
             
-            AsyncProcessSignal.createSignalJob(token, transitionName, jbpmContext.getActorId()) ;
+            AsyncProcessSignal.createSignalJob(jbpmContext, token, transitionName, jbpmContext.getActorId()) ;
         } catch (CallbackException jbpmCe) {
             logger.warn(jbpmCe.getMessage());
         }

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java	2008-02-08 18:12:10 UTC (rev 18421)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java	2008-02-08 18:23:32 UTC (rev 18422)
@@ -277,7 +277,7 @@
             if (result instanceof ProcessInstance)
             {
                 final ProcessInstance processInstance = (ProcessInstance)result ;
-                AsyncProcessSignal.createSignalJob(processInstance.getRootToken(), transitionName, getActorId()) ;
+                AsyncProcessSignal.createSignalJob(jbpmContext, processInstance.getRootToken(), transitionName, getActorId()) ;
             }
             return result ;
         }




More information about the jboss-svn-commits mailing list