[jboss-svn-commits] JBL Code SVN: r18422 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Feb 8 13:23:32 EST 2008
Author: kevin.conner at jboss.com
Date: 2008-02-08 13:23:32 -0500 (Fri, 08 Feb 2008)
New Revision: 18422
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java
Log:
Notify JobExecutor at end of transaction: JBESB-1543
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java 2008-02-08 18:12:10 UTC (rev 18421)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/AsyncProcessSignal.java 2008-02-08 18:23:32 UTC (rev 18422)
@@ -22,8 +22,13 @@
package org.jboss.soa.esb.services.jbpm.cmd;
import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.transaction.Synchronization;
+import javax.transaction.Transaction;
+
import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.TransactionStrategy;
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.command.SignalCommand;
@@ -36,6 +41,7 @@
import org.jbpm.graph.exe.Token;
import org.jbpm.instantiation.Delegation;
import org.jbpm.job.ExecuteActionJob;
+import org.jbpm.job.executor.JobExecutor;
import org.jbpm.msg.MessageService;
import org.jbpm.svc.Services;
@@ -67,6 +73,10 @@
* The name of the ESB asynchronous signal actor variable.
*/
private static final String ESB_ASYNC_SIGNAL_ACTOR_VARIABLE_NAME = ESB_ASYNC_SIGNAL_VARIABLE_NAME + "ACTOR_" ;
+ /**
+ * Map of active synchronisations.
+ */
+ private static final ConcurrentHashMap<Transaction, Synchronization> SYNCHRONISATIONS = new ConcurrentHashMap<Transaction, Synchronization>() ;
/**
* Create an asynchronous signal job for the specified token and transition.
@@ -74,7 +84,7 @@
* @param transitionName The transition to signal or null if the default transition is to be used.
* @param actor The actor to use.
*/
- static void createSignalJob(final Token token, final String transitionName, final String actor)
+ static void createSignalJob(final JbpmContext jbpmContext, final Token token, final String transitionName, final String actor)
{
final boolean isDebugEnabled = logger.isDebugEnabled() ;
final long tokenId = token.getId() ;
@@ -89,9 +99,9 @@
token.lock(ESB_ASYNC_SIGNAL_ACTION_NAME);
final String transitionVariableName = ESB_ASYNC_SIGNAL_TRANSITION_VARIABLE_NAME + tokenId ;
- setVariable(contextInstance, transitionVariableName, transitionName) ;
+ setVariable(contextInstance, token, transitionVariableName, transitionName) ;
final String actorVariableName = ESB_ASYNC_SIGNAL_ACTOR_VARIABLE_NAME + tokenId ;
- setVariable(contextInstance, actorVariableName, actor) ;
+ setVariable(contextInstance, token, actorVariableName, actor) ;
final ExecuteActionJob signalJob = new ExecuteActionJob(token) ;
signalJob.setAction(getAsyncSignalAction(token)) ;
signalJob.setDueDate(new Date()) ;
@@ -107,23 +117,46 @@
{
logger.debug("Sent signal task to message service for token id " + tokenId + " from process instance " + processInstanceId) ;
}
+
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ try
+ {
+ if (transactionStrategy.isActive())
+ {
+ final Transaction transaction = (Transaction)transactionStrategy.getTransaction() ;
+ if ((transaction != null) && !SYNCHRONISATIONS.containsKey(transaction))
+ {
+ final Synchronization synch = new JobNotifierSynchronisation(transaction, jbpmContext.getJbpmConfiguration().getJobExecutor()) ;
+ transaction.registerSynchronization(synch) ;
+ SYNCHRONISATIONS.put(transaction, synch) ;
+ }
+ }
+ }
+ catch (final Exception ex)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Failed to register synchronization", ex) ;
+ }
+ }
}
/**
* Set the context instance variable.
* @param contextInstance The context instance.
+ * @param token The current token.
* @param name The variable name
* @param value The variable value
*/
- private static void setVariable(final ContextInstance contextInstance, final String name, final String value)
+ private static void setVariable(final ContextInstance contextInstance, final Token token, final String name, final String value)
{
if (value != null)
{
- contextInstance.setVariable(name, value) ;
+ contextInstance.setVariableLocally(name, value, token) ;
}
else
{
- contextInstance.deleteVariable(name) ;
+ contextInstance.deleteVariable(name, token) ;
}
}
@@ -228,4 +261,51 @@
}
}
}
+
+ /**
+ * Synchronisation to notify the job executor.
+ * @author kevin
+ */
+ private static final class JobNotifierSynchronisation implements Synchronization
+ {
+ /**
+ * The associated transaction.
+ */
+ private Transaction transaction ;
+ /**
+ * The current job executor.
+ */
+ private final JobExecutor jobExecutor ;
+
+ /**
+ * Create the notifier synchronisation.
+ * @param transaction The current transaction.
+ * @param jobExecutor The current job executor.
+ */
+ public JobNotifierSynchronisation(final Transaction transaction, final JobExecutor jobExecutor)
+ {
+ this.transaction = transaction ;
+ this.jobExecutor = jobExecutor ;
+ }
+
+ /**
+ * The before completion notification.
+ */
+ public void beforeCompletion()
+ {
+ }
+
+ /**
+ * The after completion notification.
+ * @param status The status of the transaction.
+ */
+ public void afterCompletion(final int status)
+ {
+ SYNCHRONISATIONS.remove(transaction) ;
+ synchronized(jobExecutor)
+ {
+ jobExecutor.notify() ;
+ }
+ }
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java 2008-02-08 18:12:10 UTC (rev 18421)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CallbackCommand.java 2008-02-08 18:23:32 UTC (rev 18422)
@@ -125,7 +125,7 @@
contextInstance.addVariables(variables);
}
- AsyncProcessSignal.createSignalJob(token, transitionName, jbpmContext.getActorId()) ;
+ AsyncProcessSignal.createSignalJob(jbpmContext, token, transitionName, jbpmContext.getActorId()) ;
} catch (CallbackException jbpmCe) {
logger.warn(jbpmCe.getMessage());
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java 2008-02-08 18:12:10 UTC (rev 18421)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/services/jbpm/src/main/java/org/jboss/soa/esb/services/jbpm/cmd/CommandExecutor.java 2008-02-08 18:23:32 UTC (rev 18422)
@@ -277,7 +277,7 @@
if (result instanceof ProcessInstance)
{
final ProcessInstance processInstance = (ProcessInstance)result ;
- AsyncProcessSignal.createSignalJob(processInstance.getRootToken(), transitionName, getActorId()) ;
+ AsyncProcessSignal.createSignalJob(jbpmContext, processInstance.getRootToken(), transitionName, getActorId()) ;
}
return result ;
}
More information about the jboss-svn-commits
mailing list