Original implementation did not work as expected after server re-start. This is a an improved version in case someone else finds it useful.
I still would like someone from jBPM team to provide some remarks about this implementation
package com.mycomp.bpm.handler;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.drools.runtime.KnowledgeRuntime;
import org.drools.runtime.process.NodeInstance;
import org.drools.runtime.process.ProcessInstance;
import org.drools.runtime.process.WorkItem;
import org.drools.runtime.process.WorkItemHandler;
import org.drools.runtime.process.WorkItemManager;
import org.drools.runtime.process.WorkflowProcessInstance;
import org.jbpm.workflow.instance.node.CompositeContextNodeInstance;
import org.jbpm.workflow.instance.node.WorkItemNodeInstance;
public class ReceiveTaskHandler implements WorkItemHandler {
private static Logger logger = Logger.getLogger(ReceiveTaskHandler.class.getName());
private Map<String, Long> waiting = new HashMap<String, Long>();
private KnowledgeRuntime ksession;
public ReceiveTaskHandler(KnowledgeRuntime ksession) {
this.ksession = ksession;
}
public void setKnowledgeRuntime(KnowledgeRuntime ksession) {
this.ksession = ksession;
}
private String constructKey(long processInstanceId, String messageId) {
return processInstanceId + "|" + messageId;
}
@Override
public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {
long processInstanceId = workItem.getProcessInstanceId();
String messageId = (String) workItem.getParameter("MessageId");
waiting.put(constructKey(processInstanceId, messageId), workItem.getId());
// If waiting map previously contained a mapping for messageId, the old value is replaced !!!
}
public void messageReceived(String messageId, Object message) {
throw new UnsupportedOperationException("messageReceived(String messageId, Object message) method is not supported. " +
"Instead use messageReceived(long, String, Object) method" );
}
private Long findWorkItemId(NodeInstance prmNodeInstance,
String prmMessageId) {
Long workItemId = null;
if (prmNodeInstance instanceof WorkItemNodeInstance) {
WorkItemNodeInstance workItemNode = (WorkItemNodeInstance)prmNodeInstance;
WorkItem workItem = workItemNode.getWorkItem();
if (workItem.getName().equals("Receive Task") &&
workItem.getParameter("MessageId").equals(prmMessageId)) {
workItemId = workItem.getId();
}
}
if (prmNodeInstance instanceof CompositeContextNodeInstance) {
for (NodeInstance ni : ((CompositeContextNodeInstance)prmNodeInstance).getNodeInstances(false)) {
workItemId = findWorkItemId(ni, prmMessageId);
if (workItemId != null) {
break;
}
}
}
return workItemId;
}
public void messageReceived(long processInstanceId, String messageId, Object message) {
Long workItemId = waiting.get(constructKey(processInstanceId, messageId));
if (workItemId == null) {
// See if this is a work item persisted pefore the re-start of the server
ProcessInstance pi = ksession.getProcessInstance(processInstanceId);
final WorkflowProcessInstance workflowProcessInstance = ((WorkflowProcessInstance) pi);
for (NodeInstance nodeInstance : workflowProcessInstance.getNodeInstances()) {
workItemId = findWorkItemId(nodeInstance, messageId);
if (workItemId != null) {
break;
}
}
}
if (workItemId != null) {
Map<String, Object> results = new HashMap<String, Object>();
results.put("Message", message);
ksession.getWorkItemManager().completeWorkItem(workItemId, results);
} else {
logger.error("No workItem found for receiveTask of process with id:" + processInstanceId);
}
}
@Override
public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
long processInstanceId = workItem.getProcessInstanceId();
String messageId = (String) workItem.getParameter("MessageId");
waiting.remove(constructKey(processInstanceId, messageId));
}
}