JBoss Community

Concurrency problems with JBPM 5.4

created by Jon Kranes in jBPM - View the full discussion

I am working on a JBPM 5.4 / jBoss 7.1.1 application and have been unable to get it to run reliably using multiple threads.

 

The basic approach I am taking is as follows:

 

  • Process instances are triggered by an incoming message (AMQP/ RabbitMQ) received by a message listener.  For optimal performance I need to be able to use multiple listener threads.   (Ultimately there will be multiple application server host instances as well but first things first).
  • I am creating a new StatefulKnowlegeSession for each new ProcessInstance.
  • I am using a local HumanTaskService (note that I am able to trigger concurrency problems mentioned below even while using a workflow that has no user tasks in it).
  • I am disposing of the session and its associated handlers immediately after starting the process.  If I need to respond to an asynch signal (e.g. completion of a human task) I create the session by ID from the database (this is part of the scaling strategy - need to be able to continue the process at a future time on an entirely different machine).
  • Mainly using Oracle (11g XE) but have also tested with MySQL and see the exact same issues.

 

I am using Spring with declarative transactions.  I am using JTA transactions through the jBoss platform JTA transaction manager.

 

Here's what I am experiencing:

 

  • Everything works fine as long as as I run a single thread only (i.e. have only one instance of my message listener class).  I can run this way for hours without a single exception being thrown.
  • With two or more message listeners, I get exceptions (not necessarily right away, but if I run enough iterations it happens pretty quickly).
  • I see a variety of different exceptions each time I run the program.  The exceptions vary but all are related to inability to commit, invalid sessions, inability to find a certain piece of data (the session or process instance), deadlocks, concurrent modification exceptions, etc.  All the kinds of things you would expect in non threadsafe code.
  • Once an exception is thrown on a given thread, it seems that the thread will just keep throwing the 'Entity Manager is Closed' exception.  I am guessing there is something happenging here with a ThreadLocal tied to the thread that never gets cleared up once problems occur on that thread.  If I run long enough, all threads end up with this problem and the system basically becomes useless and needs a full restart.

 

So without trying to debug each and every exception one at a time, I'm wondering if there is anything obvious I am doing in my code that would lead to a non threadsafe condition.  I have read several threads on issues surrounding the StatefulKnowledgeSession so I have deliberately chosen to NOT share sessions between threads for that reason (and others).

 

A few main questions also:

 

  • Wrapping the Spring JTA PlatformTransactionManager in the DroolsSpringTransactionManager: i have seen this in several examples but have not seen any good explanation of whether/when it is necessary.
  • Use of the DroolsSpringJPAManager: again, have seen this in examples but no real documentation explaining it.
  • Should the StatefulKnowledgeSession be created inside a transaction?  Must it be the same transaction that it is used in?   I know that it has to be disposed outside of the transaction boundaries.

 


Thanks for any help or suggestions.  I'm happy to provide more code as needed; I wanted to keep this limited to what seems relevant rather than zip up an entire complex application and force people to wade through all of it.

 

-- Jon Kranes

 

 

 

Relevant code:

 

I have a wrapper class to hold a reference to the StatefulKnowledgeSession and its associated handlers, to make it easier to manage disposal:

 


public class SessionWrapper {
  
 private static Logger log = Logger.getLogger(SessionWrapper.class);
  
 private StatefulKnowledgeSession session;
 private GenericHTWorkItemHandler htHandler;
 private JPAWorkingMemoryDbLogger dbLogger;
 private LocalTaskService localTaskService;
  
 public void dispose() {
 try {
                                                             log.info("disposing local task service");
 localTaskService.dispose();
 }
 catch (Exception e) {
                                                             log.error(e);
 }
 try {
                                                             // log.info("disposing db logger");
                                                             // this does not do anything.
 // dbLogger.dispose();
 }
 catch (Exception e) {
                                                             log.error(e);
 }
 try {
                                                             log.info("disposing ht handler");
                                                             htHandler.dispose();
 }
 catch (Exception e) {
                                                             log.error(e);
 }
 try {
                                                             log.info("Disposing session ID " + session.getId());
                                                             session.dispose();
 }
 catch (Exception e) {
 log.error(e);
 }
 }
  
  
 public StatefulKnowledgeSession getSession() {
 return session;
 }
 public void setSession(StatefulKnowledgeSession session) {
 this.session = session;
 }
 public GenericHTWorkItemHandler getHtHandler() {
 return htHandler;
 }
 public void setHtHandler(GenericHTWorkItemHandler htHandler) {
 this.htHandler = htHandler;
 }
 public JPAWorkingMemoryDbLogger getDbLogger() {
 return dbLogger;
 }
 public void setDbLogger(JPAWorkingMemoryDbLogger dbLogger) {
 this.dbLogger = dbLogger;
 }
 public LocalTaskService getLocalTaskService() {
 return localTaskService;
 }
 public void setLocalTaskService(LocalTaskService localTaskService) {
 this.localTaskService = localTaskService;
 }
 
}
 

 

The SessionWrapper instance is created here

 


public class KnowledgeSessionFactory {
  
 @Autowired
 KnowledgeBase kbase;
  
 @Autowired
 org.jbpm.task.service.TaskService taskService;
  
 @Autowired
 EntityManagerFactory emf;
  
 @Autowired
 EventDrivenTaskHandler handler;
  
 @Autowired
         DroolsSpringTransactionManager txManager;
    
    @Autowired
    AuditService auditService;
    
    @Autowired
    AuditTaskHandler auditHandler;
   
    private static final Logger log = Logger.getLogger(KnowledgeSessionFactory.class);
    
 public SessionWrapper createKnowledgeSession() {
 log.info("Creating new StatefulKnowledgeSession");
 StatefulKnowledgeSession ksession = null;
 try {
                                                             ksession = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, getEnvironment() );
 }
 catch (Exception e) {
                                                             log.error("Unable to create sesssion!");
                                                             log.error(e);
                                                             throw new RuntimeException(e);
 }
 log.info("Session ID: " + ksession.getId());
 return createSessionWrapper(ksession); 
 }
  
 public SessionWrapper createKnowledgeSession(int sessionId) {
 log.info("Loading StatefulKnowledgeSession for ID: " + sessionId);
 StatefulKnowledgeSession ksession = JPAKnowledgeService.loadStatefulKnowledgeSession(sessionId, kbase, null, getEnvironment());
 return createSessionWrapper(ksession); 
 } 
  
 private SessionWrapper createSessionWrapper(StatefulKnowledgeSession ksession) {
 LocalTaskService taskClient = new LocalTaskService(taskService);
 GenericHTWorkItemHandler htHandler = new LocalHTWorkItemHandler(taskClient, ksession);
  
 ksession.getWorkItemManager().registerWorkItemHandler("Human Task", htHandler);
 ksession.getWorkItemManager().registerWorkItemHandler("Task", handler);
 ksession.getWorkItemManager().registerWorkItemHandler("Audit", auditHandler);
 ksession.addEventListener(new DroolsListener());
  
 // This allows access of the global in script tasks
 ksession.setGlobal("auditService", auditService);
  
 JPAWorkingMemoryDbLogger dbLogger = new JPAWorkingMemoryDbLogger(ksession);
 SessionWrapper wrapper = new SessionWrapper();
 wrapper.setSession(ksession);
 wrapper.setHtHandler(htHandler);
 wrapper.setDbLogger(dbLogger);
 wrapper.setLocalTaskService(taskClient);
 return wrapper; 
               }
  
 private Environment getEnvironment() {
 log.info("creating environment");
 env = KnowledgeBaseFactory.newEnvironment();
 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
 env.set(EnvironmentName.TRANSACTION_MANAGER, txManager);
 PersistenceContextManager persistenceContextManager = new DroolsSpringJpaManager(env);
 env.set(EnvironmentName.PERSISTENCE_CONTEXT_MANAGER, persistenceContextManager);
 log.info("environment created"); 
 return env;
 }
 
}
 
 


Here is the message driven bean that initiates starting new process instances:

 

public class StartProcessListener implements MessageListener {
  
 private static Logger log = Logger.getLogger(StartProcessListener.class);
  
 @Autowired
 EventService service;
  
 @Autowired
 KnowledgeSessionFactory factory;
  
 @Autowired
 SerializerMessageConverter converter;
  
 public void onMessage(Message message) {
 SessionWrapper wrapper = null;
 try {
                                                             Map<String, Object> map = (Map<String, Object>)converter.fromMessage(message);
                                                             Map<String, Object> params = new HashMap<String, Object>();
                                                             params.put("returnStatus", "VALID");
                                                             params.put("return", map.get("return"));
  
                                                             String processId = (String)map.get("processId");
                                                             try {
                                                                            wrapper = service.createSessionWrapper();
                                                                            service.startProcess(wrapper, processId, params);
                                                             }
                                                             catch (Exception e) {
                                                                            log.error(e);
                                                             }
 }
 catch (Exception e) {
                                                             log.error(e);
 }
 finally {
                                                             if (wrapper != null) {
                                                                            wrapper.dispose(); 
                                                             }
 }
 }
 


And finally the service class that uses declarative transactions:

 

 

public class EventServiceImpl implements EventService {
  
 private static Logger log = Logger.getLogger(EventServiceImpl.class);
  
 @Autowired
 KnowledgeSessionFactory factory;
  
 @Autowired
 TaskService taskService;
  
 @Override
 @Transactional
 public SessionWrapper createSessionWrapper() {
 return factory.createKnowledgeSession();
 }
  
 @Transactional
 @Override
 public void startProcess(SessionWrapper wrapper, String processId, Map<String, Object> params) {
 // SessionWrapper wrapper = factory.createKnowledgeSession();
 StatefulKnowledgeSession ksession = wrapper.getSession();
  
 // initializes the human task handler for the session
 // org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
  
 // this allows us to insert the processInstance into the session before starting it.
 ProcessInstance processInstance = ksession.createProcessInstance(processId, params); 
 // for drools
 ksession.insert(processInstance);
  
 log.info("Starting process instance");
 ksession.startProcessInstance(processInstance.getId());
 //return wrapper;
 }
  
 @Transactional
 @Override
               public SessionWrapper completeWorkItem(int sessionId, long workItemId, Map<String, Object> results) {
 SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
 StatefulKnowledgeSession ksession = wrapper.getSession();
 completeWorkItem(ksession, workItemId, results); 
 return wrapper;
 }
  
 @Override
 @Transactional
 public SessionWrapper completeTask(int sessionId, Long taskId, Map<String, Object> results) {
 log.info("Completing task: " + taskId);
  
 for (String key : results.keySet()) {
                                                             log.info(key + "=" + results.get(key));
 }
  
 // LocalTaskService lts = new LocalTaskService(taskService);
 // StatefulKnowledgeSession ksession = factory.getKnowledgeSession(sessionId);
 SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
 StatefulKnowledgeSession ksession = wrapper.getSession();
  
 // org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
 org.jbpm.task.TaskService lts = wrapper.getLocalTaskService();
 Task task = lts.getTask(taskId);
 // int sessionId = task.getTaskData().getProcessSessionId();
 // completeWorkItem(sessionId, task.getTaskData().getWorkItemId(), results);
  
 try {
                                                             lts.start(taskId, "USER");
 }
 catch (Exception e) {
                                                             log.error(e);
 }
  
 try {
                                                             ContentData contentData = ContentMarshallerHelper.marshal(results, null);
                                                             lts.complete(taskId, "USER", contentData);
                                                             lts.disconnect(); // does this do anything useful?
 }
 catch (Exception e) {
                                                             log.error(e);
 }
  
 // for drools
 ProcessInstance pi = ksession.getProcessInstance(task.getTaskData().getProcessInstanceId());
 ksession.insert(pi);
  
 completeWorkItem(ksession, task.getTaskData().getWorkItemId(), results);
  
 return wrapper;
  
 }
  
 private void completeWorkItem(StatefulKnowledgeSession ksession, Long workItemId, Map<String, Object> results) {
 log.info("Completing work item: " + workItemId);
 ksession.getWorkItemManager().completeWorkItem(workItemId, results); 
 }
 
 
}
 


 

Spring configuration:

 

 

<?xml version="1.0" encoding="UTF-8"?>


<beans ... >
                 
    <context:annotation-config />
    
    <!-- Loads all classes annotated with @Component or @Service -->
    <context:component-scan base-package="org.mitre.irs.eda.bpm"/>
   
    <context:property-placeholder location="classpath:eda.properties" order="2"/>     
    
          <jee:jndi-lookup id="emf" jndi-name="persistence/emf"/>
  
          <!-- Spring JtaTransactionManager -->
          <bean id="jtaTxManager" class="org.springframework.transaction.jta.JtaTransactionManager" />
  
          <bean id="txManager" class="org.drools.container.spring.beans.persistence.DroolsSpringTransactionManager"
                    c:ptm-ref="jtaTxManager" />


          <tx:annotation-driven transaction-manager="jtaTxManager" /> 
  
          <jpa:repositories base-package="org.mitre.irs.eda.common.repo" entity-manager-factory-ref="emf" transaction-manager-ref="jtaTxManager" />
  
          <bean id="jpaDialect" class="org.springframework.orm.jpa.vendor.HibernateJpaDialect" />  
  
          <!-- JBPM -->
          <bean id="sel" class="org.drools.SystemEventListenerFactory"
                    factory-method="getSystemEventListener" />

          <bean id="jbpmTaskService" class="org.jbpm.task.service.TaskService"
                    c:emf-ref="emf" c:systemEventListener-ref="sel" /> 

         <bean id="kbaseFactory" class="org.mitre.irs.eda.bpm.jbpm.KnowledgeBaseFactory" />
    
         <bean id="kbase" factory-bean="kbaseFactory" factory-method="newKnowledgeBase" />

            
    
          <!-- Rabbit MQ -->
          <rabbit:connection-factory id="connectionFactory" channel-cache-size="20"
                    username="${amqp.username}" password="${amqp.password}" host="${amqp.host}" port="${amqp.port}" />
  
          <rabbit:admin id="admin" connection-factory="connectionFactory" />
          <bean id="serializerConverter" class="org.springframework.amqp.support.converter.SerializerMessageConverter" />
          <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="serializerConverter"/>
          <rabbit:queue id="start" name="process.start" />
          <rabbit:queue id="task.start" name="process.task.start" />
          <rabbit:queue id="task.complete" name="process.task.complete" />
          <bean id="startListener" class="org.mitre.irs.eda.bpm.listener.StartProcessListener" />
          <bean id="taskCompleteListener" class="org.mitre.irs.eda.bpm.listener.TaskCompleteListener" />
          <rabbit:listener-container connection-factory="connectionFactory" concurrency="4">
                    <rabbit:listener queues="start" ref="startListener" />
                    <rabbit:listener queues="task.complete" ref="taskCompleteListener" />
          </rabbit:listener-container>   
    

          
</beans>

Reply to this message by going to Community

Start a new discussion in jBPM at Community