Gareth Edwards [
https://community.jboss.org/people/garethed] created the discussion
"Re: Problem with multiple sessions"
To view the discussion, visit:
https://community.jboss.org/message/806960#806960
--------------------------------------------------------------
Cheers,
Here are the main classes. I have removed some irrelevant methods and renamed certain
variables.
I changed to this:
StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
s.execute(dc);
but still see the same behaviour.
package com.x.api.service.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import javax.persistence.EntityManagerFactory;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.event.process.DefaultProcessEventListener;
import org.drools.event.process.ProcessCompletedEvent;
import org.drools.persistence.jpa.JPAKnowledgeService;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.audit.JPAWorkingMemoryDbLogger;
import org.jbpm.process.workitem.wsht.AsyncHornetQHTWorkItemHandler;
import org.jbpm.task.service.hornetq.AsyncHornetQTaskClient;
import org.jbpm.task.utils.OnErrorAction;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.x.api.jbpm.handler.BiometricVerificationHandler;
import com.x.api.jbpm.handler.WorkflowCurrentTaskHandler;
import com.x.api.jbpm.listener.WorkflowEventListener;
import com.x.api.model.WorkflowSession;
import com.x.api.model.WorkflowSettings;
import com.x.api.model.user.SessionUser;
import com.x.api.model.user.User;
import com.x.api.service.PackageService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.util.CMTDisposeCommand;
import com.x.api.util.xAPIException;
public class WorkflowSessionServiceImpl implements InitializingBean,
WorkflowSessionService {
private static Logger log = Logger.getLogger(WorkflowSessionServiceImpl.class);
@Autowired
private PackageService packageService;
@Autowired
private WorkflowSessionService WorkflowSessionService;
private StatefulKnowledgeSession ksession = null;
private ArrayList<WorkflowSessionTask> workFlowSessionTasks;
private Queue<WorkflowSettings> workflowQueue = new
LinkedList<WorkflowSettings>();
@Value("${HumanTaskServiceIp}")
private String ipAddress;
@Override
public void afterPropertiesSet() throws Exception {
workFlowSessionTasks = new
ArrayList<WorkflowSessionServiceImpl.WorkflowSessionTask>();
}
@Override
public List<WorkflowSession> resumeWorkflowSessions(User user)throws
xAPIException{
List<WorkflowSession> workflowSessions =
WorkflowSessionService.getActiveSessions();
List<WorkflowSession> resumedSessions = new
ArrayList<WorkflowSession>();
for (WorkflowSession WorkflowSession : workflowSessions) {
if (!isSessionLoaded(WorkflowSession.getId())){
if (this.resumeWorkflowSession(WorkflowSession,
user))
resumedSessions.add(WorkflowSession);
}
}
return resumedSessions;
}
private boolean resumeWorkflowSession(WorkflowSession pws, User user)throws
xAPIException{
boolean success = false;
EntityManagerFactory emf = packageService.getEntityManagerFactory();
Environment env = KnowledgeBaseFactory.newEnvironment();
KnowledgeBase kbase;
try {
kbase = packageService.getKnowledgeBase(pws.getPackageRef(),
pws.getPackageVersion());
env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
StatefulKnowledgeSession sks =
JPAKnowledgeService.loadStatefulKnowledgeSession(pws.getId(), kbase, null, env);
WorkflowSettings wfs = new WorkflowSettings();
wfs.setActive(true);
wfs.setCompleted(false);
wfs.setId(sks.getId());
wfs.setPackageName(pws.getPackageRef());
wfs.setPackageVersion(pws.getPackageVersion());
WorkflowSessionTask wst = new
WorkflowSessionTask(WorkflowSessionService, sks, wfs);
// Make a copy of the user to enable updates of workflowSession
table
final User u = new SessionUser(user.getId());
wst.startWorkFlow(false, u);
workFlowSessionTasks.add(wst);
success = true;
} catch (Exception e) {
success = false;
//throw new xAPIException("Error getting knowledge
base:" + e.getLocalizedMessage());
}
return success;
}
private boolean isSessionLoaded(int sessionId){
for (WorkflowSessionTask workflowSessionTask : workFlowSessionTasks)
{
if (workflowSessionTask.ksession.getId() == sessionId
&& !workflowSessionTask.completed){
return true;
}
}
return false;
}
@Override
public synchronized int startNewSession(WorkflowSettings wfs) throws
xAPIException{
purgeDeadSessions();
KnowledgeBase kbase =
packageService.getKnowledgeBase(wfs.getPackageName(), wfs.getPackageVersion());
//StatefulKnowledgeSession ksession = getSession(kbase);
//if (ksession == null)
ksession = getSession(kbase);
log.debug("Starting new session:" + ksession.getId());
wfs.setSessionId(ksession.getId());
WorkflowSessionTask wst = new
WorkflowSessionTask(WorkflowSessionService, ksession, wfs);
// Make a copy of the user to enable updates of workflowSession table
final User u = new SessionUser(wfs.getUser().getId());
wst.startWorkFlow(true, u);
this.workFlowSessionTasks.add(wst);
log.info("WorkflowSettings:" + wfs.toString());
return ksession.getId();
}
private StatefulKnowledgeSession getSession(KnowledgeBase kb){
Environment env = KnowledgeBaseFactory.newEnvironment();
env.set(EnvironmentName.ENTITY_MANAGER_FACTORY,
packageService.getEntityManagerFactory());
StatefulKnowledgeSession ksession =
JPAKnowledgeService.newStatefulKnowledgeSession(kb, null, env);
return ksession;
}
protected class WorkflowSessionTask{
private StatefulKnowledgeSession ksession;
private WorkflowSettings workflowSettings;
private boolean completed;
private WorkflowSessionService pws;
public WorkflowSessionTask(WorkflowSessionService pws,
StatefulKnowledgeSession session, WorkflowSettings wfs){
this.ksession = session;
this.workflowSettings = wfs;
this.pws = pws;
}
public void startWorkFlow(boolean startProcess, final User user) {
this.completed = false;
JPAWorkingMemoryDbLogger logger = new
JPAWorkingMemoryDbLogger(ksession);
String connectorName = "Hornet" +
UUID.randomUUID().toString();
final AsyncHornetQHTWorkItemHandler humanTaskHandler = new
AsyncHornetQHTWorkItemHandler(new AsyncHornetQTaskClient(connectorName), ksession,
OnErrorAction.LOG);
//final HumanTaskHandler humanTaskHandler = new HumanTaskHandler(new
AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
humanTaskHandler.setIpAddress(ipAddress);
humanTaskHandler.setOwningSessionOnly(true);
final CMTDisposeCommand dc = new CMTDisposeCommand();
dc.setWorkitemhandler(humanTaskHandler);
ksession.getWorkItemManager().registerWorkItemHandler("Human
Task", humanTaskHandler);
ksession.getWorkItemManager().registerWorkItemHandler("UpdateWorkflowCurrentTask",
new WorkflowCurrentTaskHandler(null, ksession.getId(), null));
ksession.addEventListener(new WorkflowEventListener(null,
ksession.getId(), null));
ksession.addEventListener(new
DefaultProcessEventListener(){
@Override
public void
afterProcessCompleted(ProcessCompletedEvent event){
log.info("~~~~~~~~~Workflow
Session:" + ksession.getId() + " Completed~~~~~~~~~");
log.info("Disposing of " +
event.getProcessInstance().getProcessName() + "!");
StatefulKnowledgeSession s =
(StatefulKnowledgeSession) event.getKnowledgeRuntime();
s.execute(dc);
//ksession.execute(dc);
completed = true;
workflowSettings.setCompleted(true);
}
});
if (startProcess)
ksession.startProcess(workflowSettings.getWorkflowName(),workflowSettings.getWorkFlowData());
}
public boolean getCompleted(){
return this.completed;
}
public WorkflowSettings getWorkFlowSettings(){
return this.workflowSettings;
}
}
private void purgeDeadSessions(){
for (Iterator<WorkflowSessionTask> iterator =
workFlowSessionTasks.iterator(); iterator.hasNext();) {
WorkflowSessionTask task = iterator.next();
if (task.getCompleted())
iterator.remove();
}
}
@Override
public synchronized ArrayList<WorkflowSettings> getWorkflowSettings(){
ArrayList<WorkflowSettings> sessions = new
ArrayList<WorkflowSettings>();
for (WorkflowSessionTask task : workFlowSessionTasks ) {
sessions.add(task.getWorkFlowSettings());
}
return sessions;
}
private boolean queueWorkflow(WorkflowSettings wfs){
log.info("Queueing workflow: " + wfs.getWorkflowName());
return this.workflowQueue.add(wfs);
}
public void dequeueAndStartWorkflows(){
log.info("Polling workflowQueue");
WorkflowSettings wfs = null;
do{
wfs = workflowQueue.poll();
if (wfs != null)
try {
this.startNewSession(wfs);
} catch (xAPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
while(wfs!=null);
}
}
CMTDisposeCommand
package com.x.api.util;
import javax.naming.InitialContext;
import javax.transaction.Synchronization;
import javax.transaction.TransactionManager;
import org.apache.log4j.Logger;
import org.drools.command.Context;
import org.drools.command.impl.GenericCommand;
import org.drools.command.impl.KnowledgeCommandContext;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.workitem.wsht.AsyncGenericHTWorkItemHandler;
public class CMTDisposeCommand implements GenericCommand<Void> {
private static final long serialVersionUID = 1L;
private static Logger log = Logger.getLogger(CMTDisposeCommand.class);
private String tmLookupName = "java:jboss/TransactionManager";
public CMTDisposeCommand() {
}
private AsyncGenericHTWorkItemHandler workitemhandler;
public CMTDisposeCommand(String tmLookup) {
this.tmLookupName = tmLookup;
}
public AsyncGenericHTWorkItemHandler getWorkitemhandler() {
return workitemhandler;
}
public void setWorkitemhandler(AsyncGenericHTWorkItemHandler workitemhandler) {
this.workitemhandler = workitemhandler;
}
@Override
public Void execute(Context context) {
final StatefulKnowledgeSession ksession = ((KnowledgeCommandContext)
context).getStatefulKnowledgesession();
try {
TransactionManager tm = (TransactionManager) new
InitialContext().lookup(tmLookupName);
tm.getTransaction().registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
// not used here
}
@Override
public void afterCompletion(int arg0) {
if (workitemhandler != null)
try {
log.info("Disposing workitemHandler for
session:" + ksession.getId());
workitemhandler.dispose();
} catch (Exception e) {
e.printStackTrace();
}
ksession.dispose();
}
});
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
PackageService
package com.x.api.service.impl;
import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Set;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.io.ResourceFactory;
import org.drools.io.impl.UrlResource;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.x.api.model.JbpmPackage;
import com.x.api.model.JbpmWorkflow;
import com.x.api.service.PackageService;
import com.x.api.util.APIException;
@Service("packageService")
public class PackageServiceImpl implements InitializingBean, PackageService {
private static Logger log = Logger.getLogger(PackageService.class);
@Value( "${DroolsGuvnorUrl}" )
private String url;
private EntityManagerFactory emf = null;
private HashMap<String, KnowledgeBase> knowLedgeBases = new
HashMap<String, KnowledgeBase>();
private List<JbpmPackage> packageCache;
// keys by package
private Hashtable<String, List<JbpmWorkflow>> workflowCache;
@Override
public void afterPropertiesSet() throws Exception {
// TODO load required knowledgeBases
this.emf = getEntityManagerFactory();
this.workflowCache = new
Hashtable<String,List<JbpmWorkflow>>();
}
@Override
public void clearCaches() {
this.packageCache = null;
this.workflowCache = new
Hashtable<String,List<JbpmWorkflow>>();
}
@Override
public List<JbpmPackage> getJbpmPackages() throws APIException {
// caching to speed up the requests
if (packageCache==null) {
List<JbpmPackage> packages = new
ArrayList<JbpmPackage>();
String authorizationHeader = "Basic " +
org.apache.cxf.common.util.Base64Utility.encode("admin:admin".getBytes());
HttpClient httpclient = new DefaultHttpClient();
try {
URIBuilder builder = new URIBuilder(url
+"/rest/packages");
URI uri = builder.build();
HttpGet httpget = new HttpGet(uri);
httpget.addHeader("Authorization",
authorizationHeader);
httpget.addHeader("Accept",
"application/json");
HttpResponse response =
httpclient.execute(httpget);
if
(response.getStatusLine().getStatusCode()!=HttpStatus.SC_OK)
throw new PIException("Failed to
retrieve list of packages from drools REST
service",response.getStatusLine().getStatusCode());
else {
String json =
IOUtils.toString(response.getEntity().getContent(),"UTF-8");
JSONArray jsonArray = new
JSONArray(json);
for (int
i=0;i<jsonArray.length();i++) {
JSONObject jo =
(JSONObject)jsonArray.get(i);
String title =
jo.getString("title");
if
(title.startsWith("x.")) {
JbpmPackage wp = new
JbpmPackage();
wp.setId(title);
wp.setDisplayTitle(title.substring(title.indexOf("x.")+8).replace("_",
" "));
wp.setDescription(jo.getString("description"));
wp.setPublishedDate(new
Date(jo.getLong("published")));
JSONObject meta =
jo.getJSONObject("metadata");
wp.setVersion(meta.getInt("versionNumber"));
wp.setArchived(meta.getBoolean("archived"));
wp.setCreatedDate(new
Timestamp(meta.getLong("created")));
log.debug("WorkflowPackage:
"+wp.toString());
packages.add(wp);
}
}
}
}
catch (Exception ue) {
throw new APIException(ue);
}
this.packageCache = packages;
}
return this.packageCache;
}
@Override
public KnowledgeBase getKnowledgeBase(String packageName, String version) throws
PIException{
if (packageName == null)
throw new PIException("Package Name must not be
null.");
if (version == null)
version = "LATEST";
log.debug("Package requested: " + packageName +
"/" + version);
String key = packageName + "|" + version;
if (!knowLedgeBases.containsKey(key)){
log.info("Package " + key + " not cached,
attempting to get from repository");
UrlResource resource = (UrlResource)
ResourceFactory.newUrlResource(url + "/org.drools.guvnor.Guvnor/package/" +
packageName + "/" + version);
resource.setBasicAuthentication("enabled");
resource.setUsername("guest");
resource.setPassword("guest");
KnowledgeBuilder kbuilder =
KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(resource, ResourceType.PKG);
knowLedgeBases.put(key, kbuilder.newKnowledgeBase());
}
log.debug(knowLedgeBases.toString());
return knowLedgeBases.get(key);
}
@Override
public EntityManagerFactory getEntityManagerFactory(){
if (emf == null){
emf = Persistence.createEntityManagerFactory(
"org.jbpm.persistence.jpa" );
Environment env = KnowledgeBaseFactory.newEnvironment();
env.set( EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
}
return emf;
}
@Override
public synchronized Set<String> getLoadedKnowledgeBases(){
return knowLedgeBases.keySet();
}
}
Cheers,
Gareth.
--------------------------------------------------------------
Reply to this message by going to Community
[
https://community.jboss.org/message/806960#806960]
Start a new discussion in jBPM at Community
[
https://community.jboss.org/choose-container!input.jspa?contentType=1&...]