BPMN2 JUnit test for BPMN2-ReceiveTask is implemented as
ksession.getWorkItemManager().registerWorkItemHandler("Receive Task", receiveTaskHandler);
...
receiveTaskHandler.messageReceived("HelloMessage", "Hello john!");
I tried to use ReceiveTaskHandler class in a project to receive a notification from an external system for a specific process-instance and resume the process upon receipt of the message. I could not figure out how to specify the process-instance for which the received message is applicable.
JUinit test works fine as there is only one process instance in the session during the execution of this test. The source code for ReceiveTaskHandler is:
public class ReceiveTaskHandler implements WorkItemHandler {
// TODO: use correlation instead of message id
private Map<String, Long> waiting = new HashMap<String, Long>();
...
public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {
String messageId = (String) workItem.getParameter("MessageId");
waiting.put(messageId, workItem.getId());
// If waiting map previously contained a mapping for messageId, the old value is replaced !!!
}
public void messageReceived(String messageId, Object message) {
Long workItemId = waiting.get(messageId);
if (workItemId == null) {
return;
}
Map<String, Object> results = new HashMap<String, Object>();
results.put("Message", message);
ksession.getWorkItemManager().completeWorkItem(workItemId, results);
}
...
}
The comment of // TODO: use correlation instead of message id at the top of the source file also gave the impression that the implementation is not complete. As an interim solution, I decided to extend ReceiveTaskHandler class as follows:
public class ReceiveTaskHandler extends org.jbpm.bpmn2.handler.ReceiveTaskHandler {
private Map<String, Long> waiting = new HashMap<String, Long>();
private KnowledgeRuntime ksession;
public ReceiveTaskHandler(KnowledgeRuntime ksession) {
super(ksession);
this.ksession = ksession;
}
private String constructKey(long processInstanceId, String messageId) {
return processInstanceId + "|" + messageId;
}
@Override
public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {
long processInstanceId = workItem.getProcessInstanceId();
String messageId = (String) workItem.getParameter("MessageId");
waiting.put(constructKey(processInstanceId, messageId), workItem.getId());
// If waiting map previously contained a mapping for messageId, the old value is replaced !!!
}
@Override
public void messageReceived(String messageId, Object message) {
throw new UnsupportedOperationException(
"messageReceived(String messageId, Object message) method is not supported. " +
"Instead use messageReceived(long, String, Object) method" );
}
public void messageReceived(long processInstanceId, String messageId, Object message) {
Long workItemId = waiting.get(constructKey(processInstanceId, messageId));
if (workItemId == null) {
return;
}
Map<String, Object> results = new HashMap<String, Object>();
results.put("Message", message);
ksession.getWorkItemManager().completeWorkItem(workItemId, results);
}
}
I was wondering if "Receive Task" is the right task to use for waiting an external notification. If yes, then can any core jBPM contributor review the extension that I used locally and consider adding it to the next release of jBPM.