Jon Kranes [
https://community.jboss.org/people/jkranes] created the discussion
"Concurrency problems with JBPM 5.4"
To view the discussion, visit:
https://community.jboss.org/message/828867#828867
--------------------------------------------------------------
I am working on a JBPM 5.4 / jBoss 7.1.1 application and have been unable to get it to run
reliably using multiple threads.
The basic approach I am taking is as follows:
* Process instances are triggered by an incoming message (AMQP/ RabbitMQ) received by a
message listener. For optimal performance I need to be able to use multiple listener
threads. (Ultimately there will be multiple application server host instances as well
but first things first).
* I am creating a new StatefulKnowlegeSession for each new ProcessInstance.
* I am using a local HumanTaskService (note that I am able to trigger concurrency problems
mentioned below even while using a workflow that has no user tasks in it).
* I am disposing of the session and its associated handlers immediately after starting the
process. If I need to respond to an asynch signal (e.g. completion of a human task) I
create the session by ID from the database (this is part of the scaling strategy - need to
be able to continue the process at a future time on an entirely different machine).
* Mainly using Oracle (11g XE) but have also tested with MySQL and see the exact same
issues.
I am using Spring with declarative transactions. I am using JTA transactions through the
jBoss platform JTA transaction manager.
Here's what I am experiencing:
* Everything works fine as long as as I run a single thread only (i.e. have only one
instance of my message listener class). I can run this way for hours without a single
exception being thrown.
* With two or more message listeners, I get exceptions (not necessarily right away, but if
I run enough iterations it happens pretty quickly).
* I see a variety of different exceptions each time I run the program. The exceptions
vary but all are related to inability to commit, invalid sessions, inability to find a
certain piece of data (the session or process instance), deadlocks, concurrent
modification exceptions, etc. All the kinds of things you would expect in non threadsafe
code.
* Once an exception is thrown on a given thread, it seems that the thread will just keep
throwing the 'Entity Manager is Closed' exception. I am guessing there is
something happenging here with a ThreadLocal tied to the thread that never gets cleared up
once problems occur on that thread. If I run long enough, all threads end up with this
problem and the system basically becomes useless and needs a full restart.
So without trying to debug each and every exception one at a time, I'm wondering if
there is anything obvious I am doing in my code that would lead to a non threadsafe
condition. I have read several threads on issues surrounding the StatefulKnowledgeSession
so I have deliberately chosen to NOT share sessions between threads for that reason (and
others).
A few main questions also:
* Wrapping the Spring JTA PlatformTransactionManager in the
DroolsSpringTransactionManager: i have seen this in several examples but have not seen any
good explanation of whether/when it is necessary.
* Use of the DroolsSpringJPAManager: again, have seen this in examples but no real
documentation explaining it.
* Should the StatefulKnowledgeSession be created inside a transaction? Must it be the
same transaction that it is used in? I know that it has to be disposed outside of the
transaction boundaries.
Thanks for any help or suggestions. I'm happy to provide more code as needed; I
wanted to keep this limited to what seems relevant rather than zip up an entire complex
application and force people to wade through all of it.
-- Jon Kranes
Relevant code:
I have a wrapper class to hold a reference to the StatefulKnowledgeSession and its
associated handlers, to make it easier to manage disposal:
public class SessionWrapper {
private static Logger log = Logger.getLogger(SessionWrapper.class);
private StatefulKnowledgeSession session;
private GenericHTWorkItemHandler htHandler;
private JPAWorkingMemoryDbLogger dbLogger;
private LocalTaskService localTaskService;
public void dispose() {
try {
log.info("disposing
local task service");
localTaskService.dispose();
}
catch (Exception e) {
log.error(e);
}
try {
// log.info("disposing
db logger");
// this does not do
anything.
// dbLogger.dispose();
}
catch (Exception e) {
log.error(e);
}
try {
log.info("disposing ht
handler");
htHandler.dispose();
}
catch (Exception e) {
log.error(e);
}
try {
log.info("Disposing
session ID " + session.getId());
session.dispose();
}
catch (Exception e) {
log.error(e);
}
}
public StatefulKnowledgeSession getSession() {
return session;
}
public void setSession(StatefulKnowledgeSession session) {
this.session = session;
}
public GenericHTWorkItemHandler getHtHandler() {
return htHandler;
}
public void setHtHandler(GenericHTWorkItemHandler htHandler) {
this.htHandler = htHandler;
}
public JPAWorkingMemoryDbLogger getDbLogger() {
return dbLogger;
}
public void setDbLogger(JPAWorkingMemoryDbLogger dbLogger) {
this.dbLogger = dbLogger;
}
public LocalTaskService getLocalTaskService() {
return localTaskService;
}
public void setLocalTaskService(LocalTaskService localTaskService) {
this.localTaskService = localTaskService;
}
}
The SessionWrapper instance is created here
public class KnowledgeSessionFactory {
@Autowired
KnowledgeBase kbase;
@Autowired
org.jbpm.task.service.TaskService taskService;
@Autowired
EntityManagerFactory emf;
@Autowired
EventDrivenTaskHandler handler;
@Autowired
DroolsSpringTransactionManager txManager;
@Autowired
AuditService auditService;
@Autowired
AuditTaskHandler auditHandler;
private static final Logger log = Logger.getLogger(KnowledgeSessionFactory.class);
public SessionWrapper createKnowledgeSession() {
log.info("Creating new StatefulKnowledgeSession");
StatefulKnowledgeSession ksession = null;
try {
ksession =
JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, getEnvironment() );
}
catch (Exception e) {
log.error("Unable to
create sesssion!");
log.error(e);
throw new
RuntimeException(e);
}
log.info("Session ID: " + ksession.getId());
return createSessionWrapper(ksession);
}
public SessionWrapper createKnowledgeSession(int sessionId) {
log.info("Loading StatefulKnowledgeSession for ID: " + sessionId);
StatefulKnowledgeSession ksession =
JPAKnowledgeService.loadStatefulKnowledgeSession(sessionId, kbase, null,
getEnvironment());
return createSessionWrapper(ksession);
}
private SessionWrapper createSessionWrapper(StatefulKnowledgeSession ksession) {
LocalTaskService taskClient = new LocalTaskService(taskService);
GenericHTWorkItemHandler htHandler = new LocalHTWorkItemHandler(taskClient, ksession);
ksession.getWorkItemManager().registerWorkItemHandler("Human Task",
htHandler);
ksession.getWorkItemManager().registerWorkItemHandler("Task", handler);
ksession.getWorkItemManager().registerWorkItemHandler("Audit", auditHandler);
ksession.addEventListener(new DroolsListener());
// This allows access of the global in script tasks
ksession.setGlobal("auditService", auditService);
JPAWorkingMemoryDbLogger dbLogger = new JPAWorkingMemoryDbLogger(ksession);
SessionWrapper wrapper = new SessionWrapper();
wrapper.setSession(ksession);
wrapper.setHtHandler(htHandler);
wrapper.setDbLogger(dbLogger);
wrapper.setLocalTaskService(taskClient);
return wrapper;
}
private Environment getEnvironment() {
log.info("creating environment");
env = KnowledgeBaseFactory.newEnvironment();
env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
env.set(EnvironmentName.TRANSACTION_MANAGER, txManager);
PersistenceContextManager persistenceContextManager = new DroolsSpringJpaManager(env);
env.set(EnvironmentName.PERSISTENCE_CONTEXT_MANAGER, persistenceContextManager);
log.info("environment created");
return env;
}
}
Here is the message driven bean that initiates starting new process instances:
public class StartProcessListener implements MessageListener {
private static Logger log = Logger.getLogger(StartProcessListener.class);
@Autowired
EventService service;
@Autowired
KnowledgeSessionFactory factory;
@Autowired
SerializerMessageConverter converter;
public void onMessage(Message message) {
SessionWrapper wrapper = null;
try {
Map<String, Object> map
= (Map<String, Object>)converter.fromMessage(message);
Map<String, Object>
params = new HashMap<String, Object>();
params.put("returnStatus", "VALID");
params.put("return", map.get("return"));
String processId =
(String)map.get("processId");
try {
wrapper =
service.createSessionWrapper();
service.startProcess(wrapper, processId, params);
}
catch (Exception e) {
log.error(e);
}
}
catch (Exception e) {
log.error(e);
}
finally {
if (wrapper != null) {
wrapper.dispose();
}
}
}
And finally the service class that uses declarative transactions:
public class EventServiceImpl implements EventService {
private static Logger log = Logger.getLogger(EventServiceImpl.class);
@Autowired
KnowledgeSessionFactory factory;
@Autowired
TaskService taskService;
@Override
@Transactional
public SessionWrapper createSessionWrapper() {
return factory.createKnowledgeSession();
}
@Transactional
@Override
public void startProcess(SessionWrapper wrapper, String processId, Map<String,
Object> params) {
// SessionWrapper wrapper = factory.createKnowledgeSession();
StatefulKnowledgeSession ksession = wrapper.getSession();
// initializes the human task handler for the session
// org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
// this allows us to insert the processInstance into the session before starting it.
ProcessInstance processInstance = ksession.createProcessInstance(processId, params);
// for drools
ksession.insert(processInstance);
log.info("Starting process instance");
ksession.startProcessInstance(processInstance.getId());
//return wrapper;
}
@Transactional
@Override
public SessionWrapper completeWorkItem(int sessionId, long workItemId,
Map<String, Object> results) {
SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
StatefulKnowledgeSession ksession = wrapper.getSession();
completeWorkItem(ksession, workItemId, results);
return wrapper;
}
@Override
@Transactional
public SessionWrapper completeTask(int sessionId, Long taskId, Map<String, Object>
results) {
log.info("Completing task: " + taskId);
for (String key : results.keySet()) {
log.info(key + "="
+ results.get(key));
}
// LocalTaskService lts = new LocalTaskService(taskService);
// StatefulKnowledgeSession ksession = factory.getKnowledgeSession(sessionId);
SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
StatefulKnowledgeSession ksession = wrapper.getSession();
// org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
org.jbpm.task.TaskService lts = wrapper.getLocalTaskService();
Task task = lts.getTask(taskId);
// int sessionId = task.getTaskData().getProcessSessionId();
// completeWorkItem(sessionId, task.getTaskData().getWorkItemId(), results);
try {
lts.start(taskId,
"USER");
}
catch (Exception e) {
log.error(e);
}
try {
ContentData contentData =
ContentMarshallerHelper.marshal(results, null);
lts.complete(taskId,
"USER", contentData);
lts.disconnect(); // does
this do anything useful?
}
catch (Exception e) {
log.error(e);
}
// for drools
ProcessInstance pi =
ksession.getProcessInstance(task.getTaskData().getProcessInstanceId());
ksession.insert(pi);
completeWorkItem(ksession, task.getTaskData().getWorkItemId(), results);
return wrapper;
}
private void completeWorkItem(StatefulKnowledgeSession ksession, Long workItemId,
Map<String, Object> results) {
log.info("Completing work item: " + workItemId);
ksession.getWorkItemManager().completeWorkItem(workItemId, results);
}
}
Spring configuration:
<?xml version="1.0" encoding="UTF-8"?>
<beans ... >
<context:annotation-config />
<!-- Loads all classes annotated with @Component or @Service -->
<context:component-scan base-package="org.mitre.irs.eda.bpm"/>
<context:property-placeholder location="classpath:eda.properties"
order="2"/>
<jee:jndi-lookup id="emf"
jndi-name="persistence/emf"/>
<!-- Spring JtaTransactionManager -->
<bean id="jtaTxManager"
class="org.springframework.transaction.jta.JtaTransactionManager" />
<bean id="txManager"
class="org.drools.container.spring.beans.persistence.DroolsSpringTransactionManager"
c:ptm-ref="jtaTxManager" />
<tx:annotation-driven transaction-manager="jtaTxManager" />
<jpa:repositories base-package="org.mitre.irs.eda.common.repo"
entity-manager-factory-ref="emf"
transaction-manager-ref="jtaTxManager" />
<bean id="jpaDialect"
class="org.springframework.orm.jpa.vendor.HibernateJpaDialect" />
<!-- JBPM -->
<bean id="sel"
class="org.drools.SystemEventListenerFactory"
factory-method="getSystemEventListener" />
<bean id="jbpmTaskService"
class="org.jbpm.task.service.TaskService"
c:emf-ref="emf" c:systemEventListener-ref="sel"
/>
<bean id="kbaseFactory"
class="org.mitre.irs.eda.bpm.jbpm.KnowledgeBaseFactory" />
<bean id="kbase" factory-bean="kbaseFactory"
factory-method="newKnowledgeBase" />
<!-- Rabbit MQ -->
<rabbit:connection-factory id="connectionFactory"
channel-cache-size="20"
username="${amqp.username}"
password="${amqp.password}" host="${amqp.host}"
port="${amqp.port}" />
<rabbit:admin id="admin"
connection-factory="connectionFactory" />
<bean id="serializerConverter"
class="org.springframework.amqp.support.converter.SerializerMessageConverter"
/>
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
message-converter="serializerConverter"/>
<rabbit:queue id="start" name="process.start" />
<rabbit:queue id="task.start" name="process.task.start"
/>
<rabbit:queue id="task.complete"
name="process.task.complete" />
<bean id="startListener"
class="org.mitre.irs.eda.bpm.listener.StartProcessListener" />
<bean id="taskCompleteListener"
class="org.mitre.irs.eda.bpm.listener.TaskCompleteListener" />
<rabbit:listener-container connection-factory="connectionFactory"
concurrency="4">
<rabbit:listener queues="start"
ref="startListener" />
<rabbit:listener queues="task.complete"
ref="taskCompleteListener" />
</rabbit:listener-container>
</beans>
--------------------------------------------------------------
Reply to this message by going to Community
[
https://community.jboss.org/message/828867#828867]
Start a new discussion in jBPM at Community
[
https://community.jboss.org/choose-container!input.jspa?contentType=1&...]