[jboss-user] [jBPM] - Re: Problem with multiple sessions

Gareth Edwards do-not-reply at jboss.com
Mon Apr 8 06:11:19 EDT 2013


Gareth Edwards [https://community.jboss.org/people/garethed] created the discussion

"Re: Problem with multiple sessions"

To view the discussion, visit: https://community.jboss.org/message/806960#806960

--------------------------------------------------------------
Cheers,

Here are the main classes.  I have removed some irrelevant methods and renamed certain variables.
I changed to this: 
StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
s.execute(dc);

but still see the same behaviour.



package com.x.api.service.impl;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;


import javax.persistence.EntityManagerFactory;


import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.event.process.DefaultProcessEventListener;
import org.drools.event.process.ProcessCompletedEvent;
import org.drools.persistence.jpa.JPAKnowledgeService;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.audit.JPAWorkingMemoryDbLogger;
import org.jbpm.process.workitem.wsht.AsyncHornetQHTWorkItemHandler;
import org.jbpm.task.service.hornetq.AsyncHornetQTaskClient;
import org.jbpm.task.utils.OnErrorAction;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;


import com.x.api.jbpm.handler.BiometricVerificationHandler;
import com.x.api.jbpm.handler.WorkflowCurrentTaskHandler;
import com.x.api.jbpm.listener.WorkflowEventListener;
import com.x.api.model.WorkflowSession;
import com.x.api.model.WorkflowSettings;
import com.x.api.model.user.SessionUser;
import com.x.api.model.user.User;
import com.x.api.service.PackageService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.util.CMTDisposeCommand;
import com.x.api.util.xAPIException;


public class WorkflowSessionServiceImpl implements InitializingBean, WorkflowSessionService {


          private static Logger log = Logger.getLogger(WorkflowSessionServiceImpl.class);
  
          @Autowired
          private PackageService packageService;
  
          @Autowired
          private WorkflowSessionService WorkflowSessionService;
  
          private StatefulKnowledgeSession ksession = null;
  
          private ArrayList<WorkflowSessionTask> workFlowSessionTasks;
  
          private Queue<WorkflowSettings> workflowQueue = new LinkedList<WorkflowSettings>();
  
          @Value("${HumanTaskServiceIp}")
          private String ipAddress;
  
          @Override
          public void afterPropertiesSet() throws Exception {
                    workFlowSessionTasks = new ArrayList<WorkflowSessionServiceImpl.WorkflowSessionTask>();
          }
  
          @Override
          public List<WorkflowSession> resumeWorkflowSessions(User user)throws xAPIException{
                    List<WorkflowSession> workflowSessions = WorkflowSessionService.getActiveSessions();
                    List<WorkflowSession> resumedSessions = new ArrayList<WorkflowSession>();
  
                    for (WorkflowSession WorkflowSession : workflowSessions) {
                              if (!isSessionLoaded(WorkflowSession.getId())){
                                        if (this.resumeWorkflowSession(WorkflowSession, user))
                                                  resumedSessions.add(WorkflowSession);
                              }
                    }
                    return resumedSessions;
          }
  
          private boolean resumeWorkflowSession(WorkflowSession pws, User user)throws xAPIException{
                    boolean success = false;
                    EntityManagerFactory emf = packageService.getEntityManagerFactory();
                    Environment env = KnowledgeBaseFactory.newEnvironment();
                    KnowledgeBase kbase;
                    try {
                              kbase = packageService.getKnowledgeBase(pws.getPackageRef(), pws.getPackageVersion());
                        env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
                        StatefulKnowledgeSession sks = JPAKnowledgeService.loadStatefulKnowledgeSession(pws.getId(), kbase, null, env);
                        WorkflowSettings wfs = new WorkflowSettings();
                        wfs.setActive(true);
                        wfs.setCompleted(false);
                        wfs.setId(sks.getId());
                        wfs.setPackageName(pws.getPackageRef());
                        wfs.setPackageVersion(pws.getPackageVersion());
                        WorkflowSessionTask wst = new WorkflowSessionTask(WorkflowSessionService, sks, wfs);
                        // Make a copy of the user to enable updates of workflowSession table
                        final User u = new SessionUser(user.getId());
                        wst.startWorkFlow(false, u);
                        workFlowSessionTasks.add(wst);
                        success = true;
                    } catch (Exception e) {
                              success = false;
                              //throw new xAPIException("Error getting knowledge base:" + e.getLocalizedMessage());
                    }
                    return success;
          }
  
          private boolean isSessionLoaded(int sessionId){
  
                    for (WorkflowSessionTask workflowSessionTask : workFlowSessionTasks) {
                              if (workflowSessionTask.ksession.getId() == sessionId &amp;&amp; !workflowSessionTask.completed){
                                        return true;
                              }
                    }
                    return false;
          }
  
          @Override
          public synchronized int startNewSession(WorkflowSettings wfs) throws xAPIException{
                    purgeDeadSessions();
  
                    KnowledgeBase kbase = packageService.getKnowledgeBase(wfs.getPackageName(), wfs.getPackageVersion());
  
                    //StatefulKnowledgeSession ksession = getSession(kbase);
                    //if (ksession == null)
                              ksession = getSession(kbase);
                    log.debug("Starting new session:" + ksession.getId());
                    wfs.setSessionId(ksession.getId());
                    WorkflowSessionTask wst = new WorkflowSessionTask(WorkflowSessionService, ksession, wfs);
                    // Make a copy of the user to enable updates of workflowSession table
              final User u = new SessionUser(wfs.getUser().getId());
                    wst.startWorkFlow(true, u);
  
                    this.workFlowSessionTasks.add(wst);
  
                    log.info("WorkflowSettings:" + wfs.toString());
                    return ksession.getId();
          }
  
             private StatefulKnowledgeSession getSession(KnowledgeBase kb){
                       Environment env = KnowledgeBaseFactory.newEnvironment();
                 env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, packageService.getEntityManagerFactory());
                       StatefulKnowledgeSession ksession = JPAKnowledgeService.newStatefulKnowledgeSession(kb, null, env);
                       return ksession;
             }
             
             protected class WorkflowSessionTask{
                       
                       private StatefulKnowledgeSession ksession;
                       private WorkflowSettings workflowSettings;
                       private boolean completed;
                       private WorkflowSessionService pws;
                       
                       public WorkflowSessionTask(WorkflowSessionService pws, StatefulKnowledgeSession session, WorkflowSettings wfs){
                                 this.ksession = session;
                                 this.workflowSettings = wfs;
                                 this.pws = pws;
                       }




                    public void startWorkFlow(boolean startProcess, final User user) {
                              this.completed = false;
                              JPAWorkingMemoryDbLogger logger = new JPAWorkingMemoryDbLogger(ksession);
                  String connectorName = "Hornet" + UUID.randomUUID().toString();
                  final AsyncHornetQHTWorkItemHandler humanTaskHandler = new AsyncHornetQHTWorkItemHandler(new AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
                  //final HumanTaskHandler humanTaskHandler = new HumanTaskHandler(new AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
                  humanTaskHandler.setIpAddress(ipAddress);
                  humanTaskHandler.setOwningSessionOnly(true);
                              final CMTDisposeCommand dc = new CMTDisposeCommand();
                              dc.setWorkitemhandler(humanTaskHandler);
  
                              ksession.getWorkItemManager().registerWorkItemHandler("Human Task", humanTaskHandler); 
                              ksession.getWorkItemManager().registerWorkItemHandler("UpdateWorkflowCurrentTask", new WorkflowCurrentTaskHandler(null, ksession.getId(), null));
                              ksession.addEventListener(new WorkflowEventListener(null, ksession.getId(), null));
                              ksession.addEventListener(new DefaultProcessEventListener(){
                                        @Override
                                        public void afterProcessCompleted(ProcessCompletedEvent event){
                                                  log.info("~~~~~~~~~Workflow Session:" + ksession.getId() + " Completed~~~~~~~~~");
                                                  log.info("Disposing of " + event.getProcessInstance().getProcessName() + "!");
                                                  StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
                                                  s.execute(dc);
                                                  //ksession.execute(dc);
                                                  completed = true;
                                                  workflowSettings.setCompleted(true);
                                        }
                              }); 
  
                              if (startProcess)
                                        ksession.startProcess(workflowSettings.getWorkflowName(),workflowSettings.getWorkFlowData());
                    }
  
                    public boolean getCompleted(){
                              return this.completed;
                    }
  
                    public WorkflowSettings getWorkFlowSettings(){
                              return this.workflowSettings;
  
                    }
  
             }
             
             private void purgeDeadSessions(){
                       for (Iterator<WorkflowSessionTask> iterator = workFlowSessionTasks.iterator(); iterator.hasNext();) {
                              WorkflowSessionTask task = iterator.next();
                              if (task.getCompleted())
                                        iterator.remove(); 
                    }
             }
             


          @Override
          public synchronized ArrayList<WorkflowSettings> getWorkflowSettings(){
                       ArrayList<WorkflowSettings> sessions = new ArrayList<WorkflowSettings>();
                       for (WorkflowSessionTask task : workFlowSessionTasks ) {
                              sessions.add(task.getWorkFlowSettings());
                    }
                       return sessions;
          }
  
          private boolean queueWorkflow(WorkflowSettings wfs){
                    log.info("Queueing workflow: " + wfs.getWorkflowName());
                    return this.workflowQueue.add(wfs);
          }
  
  
          public void dequeueAndStartWorkflows(){
                    log.info("Polling workflowQueue");
                    WorkflowSettings wfs = null;
                    do{
                              wfs = workflowQueue.poll();
                              if (wfs != null)
                                        try {
                                                  this.startNewSession(wfs);
                                        } catch (xAPIException e) {
                                                  // TODO Auto-generated catch block
                                                  e.printStackTrace();
                                        }
                    }
                    while(wfs!=null);
          }
}







CMTDisposeCommand



package com.x.api.util;


import javax.naming.InitialContext;
import javax.transaction.Synchronization;
import javax.transaction.TransactionManager;


import org.apache.log4j.Logger;
import org.drools.command.Context;
import org.drools.command.impl.GenericCommand;
import org.drools.command.impl.KnowledgeCommandContext;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.workitem.wsht.AsyncGenericHTWorkItemHandler;


public class CMTDisposeCommand implements GenericCommand<Void> {


    private static final long serialVersionUID = 1L;
    private static Logger log = Logger.getLogger(CMTDisposeCommand.class);
    private String tmLookupName = "java:jboss/TransactionManager";
    
    public CMTDisposeCommand() {    
    }


    private AsyncGenericHTWorkItemHandler workitemhandler;
    
    public CMTDisposeCommand(String tmLookup) {
        this.tmLookupName = tmLookup;
    }
    
    public AsyncGenericHTWorkItemHandler getWorkitemhandler() {
                    return workitemhandler;
          }


          public void setWorkitemhandler(AsyncGenericHTWorkItemHandler workitemhandler) {
                    this.workitemhandler = workitemhandler;
          }


          @Override
    public Void execute(Context context) {
        
        final StatefulKnowledgeSession ksession = ((KnowledgeCommandContext) context).getStatefulKnowledgesession();
        try {
            TransactionManager tm = (TransactionManager) new InitialContext().lookup(tmLookupName);
            tm.getTransaction().registerSynchronization(new Synchronization() {
                
                @Override
                public void beforeCompletion() {
                    // not used here
                    
                }
                
                @Override
                public void afterCompletion(int arg0) {
                          if (workitemhandler != null)
                                                            try {
                                                                      log.info("Disposing workitemHandler for session:" + ksession.getId());
                                                                      workitemhandler.dispose();
                                                            } catch (Exception e) {
                                                                      e.printStackTrace();
                                                            }
                    ksession.dispose();               
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }          
        return null;
          }
}




PackageService



package com.x.api.service.impl;


import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Set;


import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;


import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.io.ResourceFactory;
import org.drools.io.impl.UrlResource;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;


import com.x.api.model.JbpmPackage;
import com.x.api.model.JbpmWorkflow;
import com.x.api.service.PackageService;
import com.x.api.util.APIException;




@Service("packageService")
public class PackageServiceImpl implements InitializingBean, PackageService {
          private static Logger log = Logger.getLogger(PackageService.class);
  
          @Value( "${DroolsGuvnorUrl}" )
          private String url;
  
          private EntityManagerFactory emf = null;
          private HashMap<String, KnowledgeBase> knowLedgeBases = new HashMap<String, KnowledgeBase>(); 
  
          private List<JbpmPackage> packageCache;
  
          // keys by package
          private Hashtable<String, List<JbpmWorkflow>> workflowCache;
  
          @Override
          public void afterPropertiesSet() throws Exception {
                    // TODO load required knowledgeBases
                    this.emf = getEntityManagerFactory();
                    this.workflowCache = new Hashtable<String,List<JbpmWorkflow>>();
          }
  
          @Override
          public void clearCaches() {
                    this.packageCache = null;
                    this.workflowCache = new Hashtable<String,List<JbpmWorkflow>>(); 
          }
  
          @Override
          public List<JbpmPackage> getJbpmPackages() throws APIException {
                    // caching to speed up the requests
                    if (packageCache==null) {
                              List<JbpmPackage> packages = new ArrayList<JbpmPackage>();
                              String authorizationHeader = "Basic " + org.apache.cxf.common.util.Base64Utility.encode("admin:admin".getBytes());
                              HttpClient httpclient = new DefaultHttpClient();
                              try {
                                        URIBuilder builder = new URIBuilder(url +"/rest/packages");
  
                                        URI uri = builder.build();
                                        HttpGet httpget = new HttpGet(uri);
                                        httpget.addHeader("Authorization", authorizationHeader);
                                        httpget.addHeader("Accept", "application/json");
                                        HttpResponse response = httpclient.execute(httpget);
                                        if (response.getStatusLine().getStatusCode()!=HttpStatus.SC_OK) 
                                                  throw new PIException("Failed to retrieve list of packages from drools REST service",response.getStatusLine().getStatusCode());
                                        else {
                                                  String json = IOUtils.toString(response.getEntity().getContent(),"UTF-8");
                                                  JSONArray jsonArray = new JSONArray(json); 
                                                  for (int i=0;i<jsonArray.length();i++) {
                                                            JSONObject jo = (JSONObject)jsonArray.get(i);
                                                            String title = jo.getString("title");
                                                            if (title.startsWith("x.")) {
                                                                      JbpmPackage wp = new JbpmPackage();
                                                                      wp.setId(title);
                                                                      wp.setDisplayTitle(title.substring(title.indexOf("x.")+8).replace("_", " "));
                                                                      wp.setDescription(jo.getString("description"));
                                                                      wp.setPublishedDate(new Date(jo.getLong("published")));
                                                                      JSONObject meta = jo.getJSONObject("metadata");
                                                                      wp.setVersion(meta.getInt("versionNumber"));
                                                                      wp.setArchived(meta.getBoolean("archived"));
                                                                      wp.setCreatedDate(new Timestamp(meta.getLong("created")));
                                                                      log.debug("WorkflowPackage: "+wp.toString());
                                                                      packages.add(wp); 
                                                            }
                                                  }
                                        }
                              }
                              catch (Exception ue) {
                                        throw new APIException(ue);
                              }
                              this.packageCache = packages;
                    }
                    return this.packageCache;
          }


          @Override
          public KnowledgeBase getKnowledgeBase(String packageName, String version) throws PIException{
  
                    if (packageName == null)
                              throw new PIException("Package Name must not be null.");
  
                    if (version == null)
                              version = "LATEST";
  
                    log.debug("Package requested: " + packageName + "/" + version);
  
                    String key = packageName + "|" + version;
  
                    if (!knowLedgeBases.containsKey(key)){
                              log.info("Package " + key + " not cached, attempting to get from repository");
                        UrlResource resource = (UrlResource) ResourceFactory.newUrlResource(url + "/org.drools.guvnor.Guvnor/package/" + packageName + "/" + version);
                        resource.setBasicAuthentication("enabled");
                        resource.setUsername("guest");
                        resource.setPassword("guest");
                        KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
                        kbuilder.add(resource, ResourceType.PKG); 
                        knowLedgeBases.put(key, kbuilder.newKnowledgeBase());                 
                    }
  
                    log.debug(knowLedgeBases.toString());
  
                    return knowLedgeBases.get(key);
          }
  


          @Override
          public EntityManagerFactory getEntityManagerFactory(){
                    if (emf == null){
                              emf = Persistence.createEntityManagerFactory( "org.jbpm.persistence.jpa" );
                              Environment env = KnowledgeBaseFactory.newEnvironment();
                              env.set( EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
                    }
                    return emf;
          }
  
          @Override
          public synchronized Set<String> getLoadedKnowledgeBases(){
                    return knowLedgeBases.keySet();
          }
}



Cheers,

Gareth.
--------------------------------------------------------------

Reply to this message by going to Community
[https://community.jboss.org/message/806960#806960]

Start a new discussion in jBPM at Community
[https://community.jboss.org/choose-container!input.jspa?contentType=1&containerType=14&container=2034]

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/jboss-user/attachments/20130408/d8c1c0ca/attachment-0001.html 


More information about the jboss-user mailing list