]
Tom Baeyens closed JBPM-626.
----------------------------
Resolution: Incomplete Description
2) since 3.2, there are not two separate threads for timers and message. but instead,
there is only one job executor.
CommandExecutorThread thread safety issues
------------------------------------------
Key: JBPM-626
URL:
http://jira.jboss.com/jira/browse/JBPM-626
Project: JBoss jBPM
Issue Type: Bug
Components: Core Engine
Affects Versions: jBPM 3.1, jBPM jPDL 3.2.1, jBPM jPDL 3.2, jBPM 3.1.4, jBPM 3.1.3,
jBPM 3.1.2, jBPM 3.1.1
Environment: Windows XP Pro
Database: DB2/400 v5r3, HSQLDB
Reporter: Mark Shotton
Assigned To: Tom Baeyens
There appears to be a couple of thread safety problems with the behaviour of the
CommandExecutorThread:
(1) Consider the following code:
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
GraphSession graphSession = jbpmContext.getGraphSession();
// Find the process definition in the database
ProcessDefinition processDefinition =
graphSession.findLatestProcessDefinition(processDefinitionName);
// Create an execution of the process definition.
ProcessInstance processInstance = new ProcessInstance(processDefinition);
// Check that the process instance is in the start state
Token token = processInstance.getRootToken();
// Start the process execution
token.signal();
?
jbpmContext.save(processInstance);
} finally {
// Tear down the pojo persistence context.
jbpmContext.close();
}
If this code is executed while the token is in node ?node 1?, the token.signal() method
causes a node-enter event to fire on the next asynchronous node ?node 2? and a message to
be stored in the JBPM_MESSAGE table. This message identifies the token as being in ?node
2?. However the node associated with the token in the JBPM_TOKEN table is still in ?node
1? until the jbpmContext.close() method flushes the process instance to the database.
So the JBPM_MESSAGE.NODE for the token is ?node 2? but the JBPM_TOKEN.NODE for the same
token is ?node 1?.
This means that on line 53 of org.jbpm.command.ExecuteNodeCommand execute(), an exception
is thrown:
public void execute() {
if (! node.equals(token.getNode())) {
throw new JbpmException("couldn't continue execution for
'"+token+"', token moved");
}
and an exception is added to the message so that it is not re-tried.
As a work-around, I modified the org.jbpm.msg.command.CommandExecutorThread
handleProcessingException method in a pretty ugly way; I just leave the message in the
table to be re-tried if the exception message contains the String ?token moved?:
void handleProcessingException(MessageProcessingException e) {
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext(jbpmContextName);
try {
// get the message session from the context
DbMessageService dbMessageSessionImpl = (DbMessageService)
jbpmContext.getServices().getMessageService();
MessagingSession messageSession = dbMessageSessionImpl.getMessagingSession();
// get the problematic command message from the exception
Message message = e.message;
// MWS 01/04/06 Workaround for async continuations
//
// Will get this exception if the JBPM_MESSAGE.NODE is not the same as
// the node associated with the JBPM_MESSAGE.TOKEN (i.e. its JBPM_TOKEN.NODE)
// in the JBPM_MESSAGE table (see org.jbpm.command.ExecuteNodeCommand execute());
// however this can be caused by a token.signal()
// persisting the JBPM_MESSAGE (e.g. from a node-enter on an async node)
// before the token state is persisted by a subsequent call to
// jbpmContext.close(); e.g.
//
// try {
// ...
// token.signal() //Writes to JBPM_MESSAGE if async node entered
// jbpmContext.save();
// } finally {
// jbpmContext.close //Updates JBPM.TOKEN in DB
//
// i.e. the above is not thread-safe
//
// As a workaround, leave the message to be processed again
// on the following invocation of the CommandExecutorThread
//
if (e.getCause().getMessage() != null) {
if (e.getCause().getMessage().indexOf("token moved")!=-1) {
try {
Thread.sleep(2000);
} catch(InterruptedException ie) {
//Do nothing
}
return;
}
}
// remove the problematic message from the queue
dbMessageSessionImpl.receiveByIdNoWait(message.getId());
message = Message.createCopy(message);
// update the message with the stack trace
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
message.setException(sw.toString());
// update the message with the jbpm-error-queue destination
message.setDestination(errorDestination);
// resend
messageSession.save(message);
} finally {
jbpmContext.close();
}
}
(2) There also appears to be a problem with the interaction of the SchedulerThread and
the CommandExecutorThread. For a node ?node 1? that is marked asynchronous and has a
timer that fires on node-enter with an associated action that executes immediately, the
ActionHandler on the timer action may progress the token from asynchronous node ?node 1?
to node ?node 2? and update the token in the JBPM_TOKEN table. However the update of the
JPBM_TOKEN row may occur before the CommandExecutorThread has consumed the message in the
JBPM_MESSAGE table that is stored on entry into ?node 1? and details that the token is
currently in ?node 1?.
So the JBPM_MESSAGE.NODE for the token is ?node 1? but the JBPM_TOKEN.NODE for the same
token is ?node 2?. This means that on line 53 of org.jbpm.command.ExecuteNodeCommand
execute(), an exception is thrown (as described in (1)):
public void execute() {
if (! node.equals(token.getNode())) {
throw new JbpmException("couldn't continue execution for
'"+token+"', token moved");
}
If node 2 is also asynchronous, the situation can arise where the same token ID appears
in the JBPM_MESSAGE table twice, with each identifying a different node; ?node 1? and
?node 2?.
As a work-around, in the ActionHandler for the timer, I wait for any messages related to
the current node to be consumed by the CommandExecutorThread before progressing the token
(I make the assumption that the JBPM_MESSAGE table can only contain one message per
token):
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
graphSession = jbpmContext.getGraphSession();
//Load the processInstance from the database
processInstance = graphSession.loadProcessInstance(getProcessInstanceId());
?
MessagingSession messagingSession = jbpmContext.getMessagingSession();
for (;;) {
java.util.Iterator iterator =
messagingSession.getMessageIterator("CMD_EXECUTOR");
boolean foundToken = false;
while(iterator.hasNext()) {
Message message = (Message) iterator.next();
if (message.getToken() == token) {
foundToken = true;
}
}
if (foundToken) {
try {
System.out.println("Found message for token: " +
token.getId());
System.out.println("Waiting for CommandExecutorThread to consume
message");
Thread.sleep(500);
} catch (InterruptedException e) {
//Do nothing
}
} else {
break;
}
}
?
//Progress over the 'credit_approved' transition
taskInstance.end("credit_approved");
jbpmContext.save(processInstance);
} finally {
jbpmContext.close();
}
Note that I open a new jbpmContext rather than getting this from the ExecutionContext
because the code above executes in a new thread spawned by the ActionHandler. However,
this work-around occasionally produces the following exception, so I?m obviously doing
something wrong (I haven?t investigated this in detail as yet):
Found message for token: 6
Waiting for CommandExecutorThread to consume message
org.jbpm.persistence.JbpmPersistenceException: couldn't commit hibernate session
at org.jbpm.persistence.db.DbPersistenceService.close(DbPersistenceService.java:171)
at org.jbpm.svc.Services.close(Services.java:211)
at org.jbpm.JbpmContext.close(JbpmContext.java:138)
at
com.mdsuk.bpm.webshop.actions.ReceiveCreditReferenceAsyncActionHandler.asyncExecute(ReceiveCreditReferenceAsyncActionHandler.java:103)
at com.mdsuk.bpm.webshop.actions.AsyncActionHandler.run(AsyncActionHandler.java:24)
at java.lang.Thread.run(Unknown Source)
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another
transaction (or unsaved-value mapping was incorrect): [org.jbpm.graph.exe.Token#6]
at
org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:1635)
at
org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2208)
at
org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:2118)
at
org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2374)
at org.hibernate.action.EntityUpdateAction.execute(EntityUpdateAction.java:84)
at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:243)
at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:227)
at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:141)
at
org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:296)
at
org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:27)
at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:980)
at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:353)
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: