[jboss-user] [jBPM] - Re: Problem with multiple sessions
Gareth Edwards
do-not-reply at jboss.com
Mon Apr 8 06:11:19 EDT 2013
Gareth Edwards [https://community.jboss.org/people/garethed] created the discussion
"Re: Problem with multiple sessions"
To view the discussion, visit: https://community.jboss.org/message/806960#806960
--------------------------------------------------------------
Cheers,
Here are the main classes. I have removed some irrelevant methods and renamed certain variables.
I changed to this:
StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
s.execute(dc);
but still see the same behaviour.
package com.x.api.service.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import javax.persistence.EntityManagerFactory;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.event.process.DefaultProcessEventListener;
import org.drools.event.process.ProcessCompletedEvent;
import org.drools.persistence.jpa.JPAKnowledgeService;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.audit.JPAWorkingMemoryDbLogger;
import org.jbpm.process.workitem.wsht.AsyncHornetQHTWorkItemHandler;
import org.jbpm.task.service.hornetq.AsyncHornetQTaskClient;
import org.jbpm.task.utils.OnErrorAction;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.x.api.jbpm.handler.BiometricVerificationHandler;
import com.x.api.jbpm.handler.WorkflowCurrentTaskHandler;
import com.x.api.jbpm.listener.WorkflowEventListener;
import com.x.api.model.WorkflowSession;
import com.x.api.model.WorkflowSettings;
import com.x.api.model.user.SessionUser;
import com.x.api.model.user.User;
import com.x.api.service.PackageService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.util.CMTDisposeCommand;
import com.x.api.util.xAPIException;
public class WorkflowSessionServiceImpl implements InitializingBean, WorkflowSessionService {
private static Logger log = Logger.getLogger(WorkflowSessionServiceImpl.class);
@Autowired
private PackageService packageService;
@Autowired
private WorkflowSessionService WorkflowSessionService;
private StatefulKnowledgeSession ksession = null;
private ArrayList<WorkflowSessionTask> workFlowSessionTasks;
private Queue<WorkflowSettings> workflowQueue = new LinkedList<WorkflowSettings>();
@Value("${HumanTaskServiceIp}")
private String ipAddress;
@Override
public void afterPropertiesSet() throws Exception {
workFlowSessionTasks = new ArrayList<WorkflowSessionServiceImpl.WorkflowSessionTask>();
}
@Override
public List<WorkflowSession> resumeWorkflowSessions(User user)throws xAPIException{
List<WorkflowSession> workflowSessions = WorkflowSessionService.getActiveSessions();
List<WorkflowSession> resumedSessions = new ArrayList<WorkflowSession>();
for (WorkflowSession WorkflowSession : workflowSessions) {
if (!isSessionLoaded(WorkflowSession.getId())){
if (this.resumeWorkflowSession(WorkflowSession, user))
resumedSessions.add(WorkflowSession);
}
}
return resumedSessions;
}
private boolean resumeWorkflowSession(WorkflowSession pws, User user)throws xAPIException{
boolean success = false;
EntityManagerFactory emf = packageService.getEntityManagerFactory();
Environment env = KnowledgeBaseFactory.newEnvironment();
KnowledgeBase kbase;
try {
kbase = packageService.getKnowledgeBase(pws.getPackageRef(), pws.getPackageVersion());
env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
StatefulKnowledgeSession sks = JPAKnowledgeService.loadStatefulKnowledgeSession(pws.getId(), kbase, null, env);
WorkflowSettings wfs = new WorkflowSettings();
wfs.setActive(true);
wfs.setCompleted(false);
wfs.setId(sks.getId());
wfs.setPackageName(pws.getPackageRef());
wfs.setPackageVersion(pws.getPackageVersion());
WorkflowSessionTask wst = new WorkflowSessionTask(WorkflowSessionService, sks, wfs);
// Make a copy of the user to enable updates of workflowSession table
final User u = new SessionUser(user.getId());
wst.startWorkFlow(false, u);
workFlowSessionTasks.add(wst);
success = true;
} catch (Exception e) {
success = false;
//throw new xAPIException("Error getting knowledge base:" + e.getLocalizedMessage());
}
return success;
}
private boolean isSessionLoaded(int sessionId){
for (WorkflowSessionTask workflowSessionTask : workFlowSessionTasks) {
if (workflowSessionTask.ksession.getId() == sessionId && !workflowSessionTask.completed){
return true;
}
}
return false;
}
@Override
public synchronized int startNewSession(WorkflowSettings wfs) throws xAPIException{
purgeDeadSessions();
KnowledgeBase kbase = packageService.getKnowledgeBase(wfs.getPackageName(), wfs.getPackageVersion());
//StatefulKnowledgeSession ksession = getSession(kbase);
//if (ksession == null)
ksession = getSession(kbase);
log.debug("Starting new session:" + ksession.getId());
wfs.setSessionId(ksession.getId());
WorkflowSessionTask wst = new WorkflowSessionTask(WorkflowSessionService, ksession, wfs);
// Make a copy of the user to enable updates of workflowSession table
final User u = new SessionUser(wfs.getUser().getId());
wst.startWorkFlow(true, u);
this.workFlowSessionTasks.add(wst);
log.info("WorkflowSettings:" + wfs.toString());
return ksession.getId();
}
private StatefulKnowledgeSession getSession(KnowledgeBase kb){
Environment env = KnowledgeBaseFactory.newEnvironment();
env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, packageService.getEntityManagerFactory());
StatefulKnowledgeSession ksession = JPAKnowledgeService.newStatefulKnowledgeSession(kb, null, env);
return ksession;
}
protected class WorkflowSessionTask{
private StatefulKnowledgeSession ksession;
private WorkflowSettings workflowSettings;
private boolean completed;
private WorkflowSessionService pws;
public WorkflowSessionTask(WorkflowSessionService pws, StatefulKnowledgeSession session, WorkflowSettings wfs){
this.ksession = session;
this.workflowSettings = wfs;
this.pws = pws;
}
public void startWorkFlow(boolean startProcess, final User user) {
this.completed = false;
JPAWorkingMemoryDbLogger logger = new JPAWorkingMemoryDbLogger(ksession);
String connectorName = "Hornet" + UUID.randomUUID().toString();
final AsyncHornetQHTWorkItemHandler humanTaskHandler = new AsyncHornetQHTWorkItemHandler(new AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
//final HumanTaskHandler humanTaskHandler = new HumanTaskHandler(new AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
humanTaskHandler.setIpAddress(ipAddress);
humanTaskHandler.setOwningSessionOnly(true);
final CMTDisposeCommand dc = new CMTDisposeCommand();
dc.setWorkitemhandler(humanTaskHandler);
ksession.getWorkItemManager().registerWorkItemHandler("Human Task", humanTaskHandler);
ksession.getWorkItemManager().registerWorkItemHandler("UpdateWorkflowCurrentTask", new WorkflowCurrentTaskHandler(null, ksession.getId(), null));
ksession.addEventListener(new WorkflowEventListener(null, ksession.getId(), null));
ksession.addEventListener(new DefaultProcessEventListener(){
@Override
public void afterProcessCompleted(ProcessCompletedEvent event){
log.info("~~~~~~~~~Workflow Session:" + ksession.getId() + " Completed~~~~~~~~~");
log.info("Disposing of " + event.getProcessInstance().getProcessName() + "!");
StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
s.execute(dc);
//ksession.execute(dc);
completed = true;
workflowSettings.setCompleted(true);
}
});
if (startProcess)
ksession.startProcess(workflowSettings.getWorkflowName(),workflowSettings.getWorkFlowData());
}
public boolean getCompleted(){
return this.completed;
}
public WorkflowSettings getWorkFlowSettings(){
return this.workflowSettings;
}
}
private void purgeDeadSessions(){
for (Iterator<WorkflowSessionTask> iterator = workFlowSessionTasks.iterator(); iterator.hasNext();) {
WorkflowSessionTask task = iterator.next();
if (task.getCompleted())
iterator.remove();
}
}
@Override
public synchronized ArrayList<WorkflowSettings> getWorkflowSettings(){
ArrayList<WorkflowSettings> sessions = new ArrayList<WorkflowSettings>();
for (WorkflowSessionTask task : workFlowSessionTasks ) {
sessions.add(task.getWorkFlowSettings());
}
return sessions;
}
private boolean queueWorkflow(WorkflowSettings wfs){
log.info("Queueing workflow: " + wfs.getWorkflowName());
return this.workflowQueue.add(wfs);
}
public void dequeueAndStartWorkflows(){
log.info("Polling workflowQueue");
WorkflowSettings wfs = null;
do{
wfs = workflowQueue.poll();
if (wfs != null)
try {
this.startNewSession(wfs);
} catch (xAPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
while(wfs!=null);
}
}
CMTDisposeCommand
package com.x.api.util;
import javax.naming.InitialContext;
import javax.transaction.Synchronization;
import javax.transaction.TransactionManager;
import org.apache.log4j.Logger;
import org.drools.command.Context;
import org.drools.command.impl.GenericCommand;
import org.drools.command.impl.KnowledgeCommandContext;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.workitem.wsht.AsyncGenericHTWorkItemHandler;
public class CMTDisposeCommand implements GenericCommand<Void> {
private static final long serialVersionUID = 1L;
private static Logger log = Logger.getLogger(CMTDisposeCommand.class);
private String tmLookupName = "java:jboss/TransactionManager";
public CMTDisposeCommand() {
}
private AsyncGenericHTWorkItemHandler workitemhandler;
public CMTDisposeCommand(String tmLookup) {
this.tmLookupName = tmLookup;
}
public AsyncGenericHTWorkItemHandler getWorkitemhandler() {
return workitemhandler;
}
public void setWorkitemhandler(AsyncGenericHTWorkItemHandler workitemhandler) {
this.workitemhandler = workitemhandler;
}
@Override
public Void execute(Context context) {
final StatefulKnowledgeSession ksession = ((KnowledgeCommandContext) context).getStatefulKnowledgesession();
try {
TransactionManager tm = (TransactionManager) new InitialContext().lookup(tmLookupName);
tm.getTransaction().registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
// not used here
}
@Override
public void afterCompletion(int arg0) {
if (workitemhandler != null)
try {
log.info("Disposing workitemHandler for session:" + ksession.getId());
workitemhandler.dispose();
} catch (Exception e) {
e.printStackTrace();
}
ksession.dispose();
}
});
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
PackageService
package com.x.api.service.impl;
import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Set;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.io.ResourceFactory;
import org.drools.io.impl.UrlResource;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.x.api.model.JbpmPackage;
import com.x.api.model.JbpmWorkflow;
import com.x.api.service.PackageService;
import com.x.api.util.APIException;
@Service("packageService")
public class PackageServiceImpl implements InitializingBean, PackageService {
private static Logger log = Logger.getLogger(PackageService.class);
@Value( "${DroolsGuvnorUrl}" )
private String url;
private EntityManagerFactory emf = null;
private HashMap<String, KnowledgeBase> knowLedgeBases = new HashMap<String, KnowledgeBase>();
private List<JbpmPackage> packageCache;
// keys by package
private Hashtable<String, List<JbpmWorkflow>> workflowCache;
@Override
public void afterPropertiesSet() throws Exception {
// TODO load required knowledgeBases
this.emf = getEntityManagerFactory();
this.workflowCache = new Hashtable<String,List<JbpmWorkflow>>();
}
@Override
public void clearCaches() {
this.packageCache = null;
this.workflowCache = new Hashtable<String,List<JbpmWorkflow>>();
}
@Override
public List<JbpmPackage> getJbpmPackages() throws APIException {
// caching to speed up the requests
if (packageCache==null) {
List<JbpmPackage> packages = new ArrayList<JbpmPackage>();
String authorizationHeader = "Basic " + org.apache.cxf.common.util.Base64Utility.encode("admin:admin".getBytes());
HttpClient httpclient = new DefaultHttpClient();
try {
URIBuilder builder = new URIBuilder(url +"/rest/packages");
URI uri = builder.build();
HttpGet httpget = new HttpGet(uri);
httpget.addHeader("Authorization", authorizationHeader);
httpget.addHeader("Accept", "application/json");
HttpResponse response = httpclient.execute(httpget);
if (response.getStatusLine().getStatusCode()!=HttpStatus.SC_OK)
throw new PIException("Failed to retrieve list of packages from drools REST service",response.getStatusLine().getStatusCode());
else {
String json = IOUtils.toString(response.getEntity().getContent(),"UTF-8");
JSONArray jsonArray = new JSONArray(json);
for (int i=0;i<jsonArray.length();i++) {
JSONObject jo = (JSONObject)jsonArray.get(i);
String title = jo.getString("title");
if (title.startsWith("x.")) {
JbpmPackage wp = new JbpmPackage();
wp.setId(title);
wp.setDisplayTitle(title.substring(title.indexOf("x.")+8).replace("_", " "));
wp.setDescription(jo.getString("description"));
wp.setPublishedDate(new Date(jo.getLong("published")));
JSONObject meta = jo.getJSONObject("metadata");
wp.setVersion(meta.getInt("versionNumber"));
wp.setArchived(meta.getBoolean("archived"));
wp.setCreatedDate(new Timestamp(meta.getLong("created")));
log.debug("WorkflowPackage: "+wp.toString());
packages.add(wp);
}
}
}
}
catch (Exception ue) {
throw new APIException(ue);
}
this.packageCache = packages;
}
return this.packageCache;
}
@Override
public KnowledgeBase getKnowledgeBase(String packageName, String version) throws PIException{
if (packageName == null)
throw new PIException("Package Name must not be null.");
if (version == null)
version = "LATEST";
log.debug("Package requested: " + packageName + "/" + version);
String key = packageName + "|" + version;
if (!knowLedgeBases.containsKey(key)){
log.info("Package " + key + " not cached, attempting to get from repository");
UrlResource resource = (UrlResource) ResourceFactory.newUrlResource(url + "/org.drools.guvnor.Guvnor/package/" + packageName + "/" + version);
resource.setBasicAuthentication("enabled");
resource.setUsername("guest");
resource.setPassword("guest");
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(resource, ResourceType.PKG);
knowLedgeBases.put(key, kbuilder.newKnowledgeBase());
}
log.debug(knowLedgeBases.toString());
return knowLedgeBases.get(key);
}
@Override
public EntityManagerFactory getEntityManagerFactory(){
if (emf == null){
emf = Persistence.createEntityManagerFactory( "org.jbpm.persistence.jpa" );
Environment env = KnowledgeBaseFactory.newEnvironment();
env.set( EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
}
return emf;
}
@Override
public synchronized Set<String> getLoadedKnowledgeBases(){
return knowLedgeBases.keySet();
}
}
Cheers,
Gareth.
--------------------------------------------------------------
Reply to this message by going to Community
[https://community.jboss.org/message/806960#806960]
Start a new discussion in jBPM at Community
[https://community.jboss.org/choose-container!input.jspa?contentType=1&containerType=14&container=2034]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/jboss-user/attachments/20130408/d8c1c0ca/attachment-0001.html
More information about the jboss-user
mailing list