JBoss Community

Re: ReceiveTaskHandler handling multiple process intances with the same message-id

created by Melih Cetin in jBPM - View the full discussion

Original implementation did not work as expected after server re-start. This is a an improved version in case someone else finds it useful.

 

I still would like someone from jBPM team to provide some remarks about this implementation

 

package com.mycomp.bpm.handler;

 

import java.util.HashMap;

import java.util.Map;

 

import org.apache.log4j.Logger;

import org.drools.runtime.KnowledgeRuntime;

import org.drools.runtime.process.NodeInstance;

import org.drools.runtime.process.ProcessInstance;

import org.drools.runtime.process.WorkItem;

import org.drools.runtime.process.WorkItemHandler;

import org.drools.runtime.process.WorkItemManager;

import org.drools.runtime.process.WorkflowProcessInstance;

import org.jbpm.workflow.instance.node.CompositeContextNodeInstance;

import org.jbpm.workflow.instance.node.WorkItemNodeInstance;

 

public class ReceiveTaskHandler implements WorkItemHandler {

   

    private static Logger logger = Logger.getLogger(ReceiveTaskHandler.class.getName());

   

    private Map<String, Long> waiting = new HashMap<String, Long>();

    private KnowledgeRuntime ksession;

 

    public ReceiveTaskHandler(KnowledgeRuntime ksession) {

       this.ksession = ksession;

    }

 

    public void setKnowledgeRuntime(KnowledgeRuntime ksession) {

        this.ksession = ksession;

    }

 

    private String constructKey(long   processInstanceId, String messageId) {

        return processInstanceId + "|" + messageId;

    }

   

    @Override

    public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {

        long processInstanceId = workItem.getProcessInstanceId();

        String messageId = (String) workItem.getParameter("MessageId");

        waiting.put(constructKey(processInstanceId, messageId), workItem.getId());

        // If waiting map previously contained a mapping for messageId, the old value is replaced !!!

    }

   

    public void messageReceived(String messageId, Object message) {

        throw new UnsupportedOperationException("messageReceived(String messageId, Object message) method is not supported. " +

                                                "Instead use messageReceived(long, String, Object) method" );

    }

   

    private Long findWorkItemId(NodeInstance prmNodeInstance,

                                String       prmMessageId) {

        Long workItemId = null;

        if (prmNodeInstance instanceof WorkItemNodeInstance) {

            WorkItemNodeInstance workItemNode = (WorkItemNodeInstance)prmNodeInstance;

            WorkItem workItem = workItemNode.getWorkItem();

            if (workItem.getName().equals("Receive Task") &&

                workItem.getParameter("MessageId").equals(prmMessageId)) {

                workItemId = workItem.getId();

            }

        }

        if (prmNodeInstance instanceof CompositeContextNodeInstance) {

            for (NodeInstance ni : ((CompositeContextNodeInstance)prmNodeInstance).getNodeInstances(false)) {

                workItemId = findWorkItemId(ni, prmMessageId);

                if (workItemId != null) {

                    break;

                }

            }

        }

        return workItemId;

    }

   

    public void messageReceived(long processInstanceId, String  messageId, Object message) {

        Long workItemId = waiting.get(constructKey(processInstanceId, messageId));

        if (workItemId == null) {

            // See if this is a work item persisted pefore the re-start of the server

            ProcessInstance pi = ksession.getProcessInstance(processInstanceId);

            final WorkflowProcessInstance workflowProcessInstance = ((WorkflowProcessInstance) pi);

            for (NodeInstance nodeInstance : workflowProcessInstance.getNodeInstances()) {

                workItemId = findWorkItemId(nodeInstance, messageId);

                if (workItemId != null) {

                    break;

                }

            }

        }

        if (workItemId != null) {

            Map<String, Object> results = new HashMap<String, Object>();

            results.put("Message", message);

            ksession.getWorkItemManager().completeWorkItem(workItemId, results);

        } else {

            logger.error("No workItem found for receiveTask of process with id:" + processInstanceId);

        }

    }

   

    @Override

    public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {

        long processInstanceId = workItem.getProcessInstanceId();

        String messageId = (String) workItem.getParameter("MessageId");

        waiting.remove(constructKey(processInstanceId, messageId));

    }

}

Reply to this message by going to Community

Start a new discussion in jBPM at Community