I am working on a JBPM 5.4 / jBoss 7.1.1 application and have been unable to get it to run reliably using multiple threads.
The basic approach I am taking is as follows:
- Process instances are triggered by an incoming message (AMQP/ RabbitMQ) received by a message listener. For optimal performance I need to be able to use multiple listener threads. (Ultimately there will be multiple application server host instances as well but first things first).
- I am creating a new StatefulKnowlegeSession for each new ProcessInstance.
- I am using a local HumanTaskService (note that I am able to trigger concurrency problems mentioned below even while using a workflow that has no user tasks in it).
- I am disposing of the session and its associated handlers immediately after starting the process. If I need to respond to an asynch signal (e.g. completion of a human task) I create the session by ID from the database (this is part of the scaling strategy - need to be able to continue the process at a future time on an entirely different machine).
- Mainly using Oracle (11g XE) but have also tested with MySQL and see the exact same issues.
I am using Spring with declarative transactions. I am using JTA transactions through the jBoss platform JTA transaction manager.
Here's what I am experiencing:
- Everything works fine as long as as I run a single thread only (i.e. have only one instance of my message listener class). I can run this way for hours without a single exception being thrown.
- With two or more message listeners, I get exceptions (not necessarily right away, but if I run enough iterations it happens pretty quickly).
- I see a variety of different exceptions each time I run the program. The exceptions vary but all are related to inability to commit, invalid sessions, inability to find a certain piece of data (the session or process instance), deadlocks, concurrent modification exceptions, etc. All the kinds of things you would expect in non threadsafe code.
- Once an exception is thrown on a given thread, it seems that the thread will just keep throwing the 'Entity Manager is Closed' exception. I am guessing there is something happenging here with a ThreadLocal tied to the thread that never gets cleared up once problems occur on that thread. If I run long enough, all threads end up with this problem and the system basically becomes useless and needs a full restart.
So without trying to debug each and every exception one at a time, I'm wondering if there is anything obvious I am doing in my code that would lead to a non threadsafe condition. I have read several threads on issues surrounding the StatefulKnowledgeSession so I have deliberately chosen to NOT share sessions between threads for that reason (and others).
A few main questions also:
- Wrapping the Spring JTA PlatformTransactionManager in the DroolsSpringTransactionManager: i have seen this in several examples but have not seen any good explanation of whether/when it is necessary.
- Use of the DroolsSpringJPAManager: again, have seen this in examples but no real documentation explaining it.
- Should the StatefulKnowledgeSession be created inside a transaction? Must it be the same transaction that it is used in? I know that it has to be disposed outside of the transaction boundaries.
Thanks for any help or suggestions. I'm happy to provide more code as needed; I wanted to keep this limited to what seems relevant rather than zip up an entire complex application and force people to wade through all of it.
-- Jon Kranes
Relevant code:
I have a wrapper class to hold a reference to the StatefulKnowledgeSession and its associated handlers, to make it easier to manage disposal:
public class SessionWrapper {
private static Logger log = Logger.getLogger(SessionWrapper.class);
private StatefulKnowledgeSession session;
private GenericHTWorkItemHandler htHandler;
private JPAWorkingMemoryDbLogger dbLogger;
private LocalTaskService localTaskService;
public void dispose() {
try {
log.info("disposing local task service");
localTaskService.dispose();
}
catch (Exception e) {
log.error(e);
}
try {
// log.info("disposing db logger");
// this does not do anything.
// dbLogger.dispose();
}
catch (Exception e) {
log.error(e);
}
try {
log.info("disposing ht handler");
htHandler.dispose();
}
catch (Exception e) {
log.error(e);
}
try {
log.info("Disposing session ID " + session.getId());
session.dispose();
}
catch (Exception e) {
log.error(e);
}
}
public StatefulKnowledgeSession getSession() {
return session;
}
public void setSession(StatefulKnowledgeSession session) {
this.session = session;
}
public GenericHTWorkItemHandler getHtHandler() {
return htHandler;
}
public void setHtHandler(GenericHTWorkItemHandler htHandler) {
this.htHandler = htHandler;
}
public JPAWorkingMemoryDbLogger getDbLogger() {
return dbLogger;
}
public void setDbLogger(JPAWorkingMemoryDbLogger dbLogger) {
this.dbLogger = dbLogger;
}
public LocalTaskService getLocalTaskService() {
return localTaskService;
}
public void setLocalTaskService(LocalTaskService localTaskService) {
this.localTaskService = localTaskService;
}
}
The SessionWrapper instance is created here
public class KnowledgeSessionFactory {
@Autowired
KnowledgeBase kbase;
@Autowired
org.jbpm.task.service.TaskService taskService;
@Autowired
EntityManagerFactory emf;
@Autowired
EventDrivenTaskHandler handler;
@Autowired
DroolsSpringTransactionManager txManager;
@Autowired
AuditService auditService;
@Autowired
AuditTaskHandler auditHandler;
private static final Logger log = Logger.getLogger(KnowledgeSessionFactory.class);
public SessionWrapper createKnowledgeSession() {
log.info("Creating new StatefulKnowledgeSession");
StatefulKnowledgeSession ksession = null;
try {
ksession = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, getEnvironment() );
}
catch (Exception e) {
log.error("Unable to create sesssion!");
log.error(e);
throw new RuntimeException(e);
}
log.info("Session ID: " + ksession.getId());
return createSessionWrapper(ksession);
}
public SessionWrapper createKnowledgeSession(int sessionId) {
log.info("Loading StatefulKnowledgeSession for ID: " + sessionId);
StatefulKnowledgeSession ksession = JPAKnowledgeService.loadStatefulKnowledgeSession(sessionId, kbase, null, getEnvironment());
return createSessionWrapper(ksession);
}
private SessionWrapper createSessionWrapper(StatefulKnowledgeSession ksession) {
LocalTaskService taskClient = new LocalTaskService(taskService);
GenericHTWorkItemHandler htHandler = new LocalHTWorkItemHandler(taskClient, ksession);
ksession.getWorkItemManager().registerWorkItemHandler("Human Task", htHandler);
ksession.getWorkItemManager().registerWorkItemHandler("Task", handler);
ksession.getWorkItemManager().registerWorkItemHandler("Audit", auditHandler);
ksession.addEventListener(new DroolsListener());
// This allows access of the global in script tasks
ksession.setGlobal("auditService", auditService);
JPAWorkingMemoryDbLogger dbLogger = new JPAWorkingMemoryDbLogger(ksession);
SessionWrapper wrapper = new SessionWrapper();
wrapper.setSession(ksession);
wrapper.setHtHandler(htHandler);
wrapper.setDbLogger(dbLogger);
wrapper.setLocalTaskService(taskClient);
return wrapper;
}
private Environment getEnvironment() {
log.info("creating environment");
env = KnowledgeBaseFactory.newEnvironment();
env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
env.set(EnvironmentName.TRANSACTION_MANAGER, txManager);
PersistenceContextManager persistenceContextManager = new DroolsSpringJpaManager(env);
env.set(EnvironmentName.PERSISTENCE_CONTEXT_MANAGER, persistenceContextManager);
log.info("environment created");
return env;
}
}
Here is the message driven bean that initiates starting new process instances:
public class StartProcessListener implements MessageListener {
private static Logger log = Logger.getLogger(StartProcessListener.class);
@Autowired
EventService service;
@Autowired
KnowledgeSessionFactory factory;
@Autowired
SerializerMessageConverter converter;
public void onMessage(Message message) {
SessionWrapper wrapper = null;
try {
Map<String, Object> map = (Map<String, Object>)converter.fromMessage(message);
Map<String, Object> params = new HashMap<String, Object>();
params.put("returnStatus", "VALID");
params.put("return", map.get("return"));
String processId = (String)map.get("processId");
try {
wrapper = service.createSessionWrapper();
service.startProcess(wrapper, processId, params);
}
catch (Exception e) {
log.error(e);
}
}
catch (Exception e) {
log.error(e);
}
finally {
if (wrapper != null) {
wrapper.dispose();
}
}
}
And finally the service class that uses declarative transactions:
public class EventServiceImpl implements EventService {
private static Logger log = Logger.getLogger(EventServiceImpl.class);
@Autowired
KnowledgeSessionFactory factory;
@Autowired
TaskService taskService;
@Override
@Transactional
public SessionWrapper createSessionWrapper() {
return factory.createKnowledgeSession();
}
@Transactional
@Override
public void startProcess(SessionWrapper wrapper, String processId, Map<String, Object> params) {
// SessionWrapper wrapper = factory.createKnowledgeSession();
StatefulKnowledgeSession ksession = wrapper.getSession();
// initializes the human task handler for the session
// org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
// this allows us to insert the processInstance into the session before starting it.
ProcessInstance processInstance = ksession.createProcessInstance(processId, params);
// for drools
ksession.insert(processInstance);
log.info("Starting process instance");
ksession.startProcessInstance(processInstance.getId());
//return wrapper;
}
@Transactional
@Override
public SessionWrapper completeWorkItem(int sessionId, long workItemId, Map<String, Object> results) {
SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
StatefulKnowledgeSession ksession = wrapper.getSession();
completeWorkItem(ksession, workItemId, results);
return wrapper;
}
@Override
@Transactional
public SessionWrapper completeTask(int sessionId, Long taskId, Map<String, Object> results) {
log.info("Completing task: " + taskId);
for (String key : results.keySet()) {
log.info(key + "=" + results.get(key));
}
// LocalTaskService lts = new LocalTaskService(taskService);
// StatefulKnowledgeSession ksession = factory.getKnowledgeSession(sessionId);
SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
StatefulKnowledgeSession ksession = wrapper.getSession();
// org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
org.jbpm.task.TaskService lts = wrapper.getLocalTaskService();
Task task = lts.getTask(taskId);
// int sessionId = task.getTaskData().getProcessSessionId();
// completeWorkItem(sessionId, task.getTaskData().getWorkItemId(), results);
try {
lts.start(taskId, "USER");
}
catch (Exception e) {
log.error(e);
}
try {
ContentData contentData = ContentMarshallerHelper.marshal(results, null);
lts.complete(taskId, "USER", contentData);
lts.disconnect(); // does this do anything useful?
}
catch (Exception e) {
log.error(e);
}
// for drools
ProcessInstance pi = ksession.getProcessInstance(task.getTaskData().getProcessInstanceId());
ksession.insert(pi);
completeWorkItem(ksession, task.getTaskData().getWorkItemId(), results);
return wrapper;
}
private void completeWorkItem(StatefulKnowledgeSession ksession, Long workItemId, Map<String, Object> results) {
log.info("Completing work item: " + workItemId);
ksession.getWorkItemManager().completeWorkItem(workItemId, results);
}
}
Spring configuration:
<?xml version="1.0" encoding="UTF-8"?>
<beans ... >
<context:annotation-config />
<rabbit:connection-factory id="connectionFactory" channel-cache-size="20"
username="${amqp.username}" password="${amqp.password}" host="${amqp.host}" port="${amqp.port}" />
<rabbit:admin id="admin" connection-factory="connectionFactory" />
<bean id="serializerConverter" class="org.springframework.amqp.support.converter.SerializerMessageConverter" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="serializerConverter"/>
<rabbit:queue id="start" name="process.start" />
<rabbit:queue id="task.start" name="process.task.start" />
<rabbit:queue id="task.complete" name="process.task.complete" />
<bean id="startListener" class="org.mitre.irs.eda.bpm.listener.StartProcessListener" />
<bean id="taskCompleteListener" class="org.mitre.irs.eda.bpm.listener.TaskCompleteListener" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="4">
<rabbit:listener queues="start" ref="startListener" />
<rabbit:listener queues="task.complete" ref="taskCompleteListener" />
</rabbit:listener-container>
</beans>