JBoss Community

ReceiveTaskHandler handling multiple process intances with the same message-id

created by Melih Cetin in jBPM - View the full discussion

BPMN2 JUnit test for BPMN2-ReceiveTask is implemented as

 

ksession.getWorkItemManager().registerWorkItemHandler("Receive Task", receiveTaskHandler);

...

receiveTaskHandler.messageReceived("HelloMessage", "Hello john!");

 

I tried to use ReceiveTaskHandler class in a project to receive a notification from an external system for a specific process-instance and resume the process upon receipt of the message. I could not figure out how to specify the process-instance for which the received message is applicable.

 

JUinit test works fine as there is only one process instance in the session during the execution of this test. The source code for ReceiveTaskHandler is:

 

public class ReceiveTaskHandler implements WorkItemHandler {

   

    // TODO: use correlation instead of message id

    private Map<String, Long> waiting = new HashMap<String, Long>();

        ...

    public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {

        String messageId = (String) workItem.getParameter("MessageId");

        waiting.put(messageId, workItem.getId());

        // If waiting map previously contained a mapping for messageId, the old value is replaced !!!

    }

   

    public void messageReceived(String messageId, Object message) {

        Long workItemId = waiting.get(messageId);

        if (workItemId == null) {

            return;

        }

        Map<String, Object> results = new HashMap<String, Object>();

        results.put("Message", message);

        ksession.getWorkItemManager().completeWorkItem(workItemId, results);

    }

    ...

}

 

The comment of // TODO: use correlation instead of message id at the top of the source file  also gave the impression that the implementation is not complete. As an interim solution, I decided to extend ReceiveTaskHandler class as follows:

 

public class ReceiveTaskHandler extends org.jbpm.bpmn2.handler.ReceiveTaskHandler {

   

    private Map<String, Long> waiting = new HashMap<String, Long>();

    private KnowledgeRuntime ksession;

 

    public ReceiveTaskHandler(KnowledgeRuntime ksession) {

       super(ksession);

       this.ksession = ksession;

    }

 

    private String constructKey(long   processInstanceId, String messageId) {

        return processInstanceId + "|" + messageId;

    }

   

    @Override

    public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {

        long processInstanceId = workItem.getProcessInstanceId();

        String messageId = (String) workItem.getParameter("MessageId");

        waiting.put(constructKey(processInstanceId, messageId), workItem.getId());

        // If waiting map previously contained a mapping for messageId, the old value is replaced !!!

    }

   

    @Override

    public void messageReceived(String messageId, Object message) {

        throw new UnsupportedOperationException(

                  "messageReceived(String messageId, Object message) method is not supported. " +

                  "Instead use messageReceived(long, String, Object) method" );

    }

   

    public void messageReceived(long processInstanceId, String  messageId, Object message) {

        Long workItemId = waiting.get(constructKey(processInstanceId, messageId));

        if (workItemId == null) {

            return;

        }

        Map<String, Object> results = new HashMap<String, Object>();

        results.put("Message", message);

        ksession.getWorkItemManager().completeWorkItem(workItemId, results);

    }

}

 

I was wondering if "Receive Task" is the right task to use for waiting an external notification. If yes, then can any core jBPM contributor review the extension that I used locally and consider adding it to the next release of jBPM.

Reply to this message by going to Community

Start a new discussion in jBPM at Community