[jbpm-commits] JBoss JBPM SVN: r4073 - in jbpm3/trunk/modules: core/src/main/java/org/jbpm/job/executor and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Feb 27 20:05:43 EST 2009


Author: alex.guizar at jboss.com
Date: 2009-02-27 20:05:43 -0500 (Fri, 27 Feb 2009)
New Revision: 4073

Added:
   jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jbpm1952/
   jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jbpm1952/JBPM1952Test.java
Removed:
   jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobsCommand.java
Modified:
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/db/JobSession.java
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/svc/Services.java
   jbpm3/trunk/modules/core/src/test/java/org/jbpm/db/JobSessionDbTest.java
   jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobCommand.java
   jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/AbstractEnterpriseTestCase.java
   jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jms/JmsMessageTest.java
Log:
JBPM-1952: SOSE in async fork with multiple job processors

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/db/JobSession.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/db/JobSession.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -28,7 +28,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.hibernate.Criteria;
 import org.hibernate.HibernateException;
 import org.hibernate.Query;
 import org.hibernate.Session;
@@ -118,11 +117,13 @@
    */
   public List<Job> findFailedJobs() {
     try {
-      List jobs = session.createCriteria(Job.class)
+      List<?> jobs = session.createCriteria(Job.class)
         .add(Restrictions.eq("retries", 0))
-        .add(Restrictions.isNotNull("exception")).list();
+        .add(Restrictions.isNotNull("exception"))
+        .list();
       return CollectionUtil.checkList(jobs, Job.class);      
-    } catch (Exception e) {
+    }
+    catch (HibernateException e) {
       throw new JbpmException("couldn't find failed jobs", e);
     }
   }

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -8,7 +8,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.hibernate.HibernateException;
 
 import org.jbpm.JbpmConfiguration;
 import org.jbpm.JbpmContext;
@@ -16,8 +15,8 @@
 import org.jbpm.graph.exe.ProcessInstance;
 import org.jbpm.job.Job;
 import org.jbpm.persistence.JbpmPersistenceException;
+import org.jbpm.persistence.db.DbPersistenceService;
 import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-import org.jbpm.svc.Services;
 
 public class JobExecutorThread extends Thread {
 
@@ -143,8 +142,8 @@
           log.debug("obtained lock on jobs: " + acquiredJobs);
         }
         catch (JbpmPersistenceException e) {
-          // if this is a stale object exception, keep it quiet
-          if (Services.isCausedByStaleState(e)) {
+          // if this is a stale state exception, keep it quiet
+          if (DbPersistenceService.isStaleStateException(e)) {
             log.debug("optimistic locking failed, could not acquire jobs " + acquiredJobs);
             StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
                 "optimistic locking failed, could not acquire jobs " + acquiredJobs, e);
@@ -177,7 +176,7 @@
       }
       catch (Exception e) {
         log.debug("exception while executing " + job, e);
-        if (!isPersistenceException(e)) {
+        if (!DbPersistenceService.isPersistenceException(e)) {
           StringWriter memoryWriter = new StringWriter();
           e.printStackTrace(new PrintWriter(memoryWriter));
           job.setException(memoryWriter.toString());
@@ -201,7 +200,7 @@
       }
       catch (JbpmPersistenceException e) {
         // if this is a stale state exception, keep it quiet
-        if (Services.isCausedByStaleState(e)) {
+        if (DbPersistenceService.isStaleStateException(e)) {
           log.debug("optimistic locking failed, could not complete job " + job);
           StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
               "optimistic locking failed, could not complete job " + job, e);
@@ -213,14 +212,6 @@
     }
   }
 
-  private static boolean isPersistenceException(Throwable throwable) {
-    do {
-      if (throwable instanceof HibernateException) return true;
-      throwable = throwable.getCause();
-    } while (throwable != null);
-    return false;
-  }
-
   protected Date getNextDueDate() {
     Date nextDueDate = null;
     JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
@@ -238,8 +229,8 @@
         jbpmContext.close();
       }
       catch (JbpmPersistenceException e) {
-        // if this is a stale object exception, keep it quiet
-        if (Services.isCausedByStaleState(e)) {
+        // if this is a stale state exception, keep it quiet
+        if (DbPersistenceService.isStaleStateException(e)) {
           log.debug("optimistic locking failed, could not return next due date: " + nextDueDate);
           StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
               "optimistic locking failed, could not return next due date: " + nextDueDate, e);

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -10,8 +10,8 @@
 import org.jbpm.db.JobSession;
 import org.jbpm.job.Job;
 import org.jbpm.persistence.JbpmPersistenceException;
+import org.jbpm.persistence.db.DbPersistenceService;
 import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-import org.jbpm.svc.Services;
 
 public class LockMonitorThread extends Thread {
 
@@ -90,8 +90,8 @@
         jbpmContext.close();
       }
       catch (JbpmPersistenceException e) {
-        // if this is a stale object exception, keep it quiet
-        if (Services.isCausedByStaleState(e)) {
+        // if this is a stale state exception, keep it quiet
+        if (DbPersistenceService.isStaleStateException(e)) {
           log.debug("optimistic locking failed, could not unlock overdue jobs: " + overdueJobs);
           StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
               "problem unlocking overdue jobs: optimistic locking failed", e);

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -27,6 +27,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.hibernate.HibernateException;
 import org.hibernate.Session;
 import org.hibernate.SessionFactory;
 import org.hibernate.StaleStateException;
@@ -77,8 +78,8 @@
   }
 
   public SessionFactory getSessionFactory() {
-    return session != null ? session.getSessionFactory()
-        : persistenceServiceFactory.getSessionFactory();
+    return session != null ? session.getSessionFactory() : persistenceServiceFactory
+        .getSessionFactory();
   }
 
   public Session getSession() {
@@ -225,14 +226,15 @@
       mustSessionBeFlushed = false; // commit does a flush anyway
       transaction.commit();
     }
-    catch (StaleStateException e) {
-      log.info("problem committing transaction: optimistic locking failed");
-      StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
-          "optimistic locking failed while committing " + transaction, e);
-      return e;
-    }
     catch (Exception e) {
-      log.error("transaction commit failed", e);
+      if (isStaleStateException(e)) {
+        log.info("problem committing transaction: optimistic locking failed");
+        StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
+            "optimistic locking failed while committing " + transaction, e);
+      }
+      else {
+        log.error("transaction commit failed", e);
+      }
       return e;
     }
     return null;
@@ -258,14 +260,15 @@
         log.debug("flushing " + session);
         session.flush();
       }
-      catch (StaleStateException e) {
-        log.info("problem flushing session: optimistic locking failed");
-        StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
-            "optimistic locking failed while flushing " + session, e);
-        return e;
-      }
       catch (Exception e) {
-        log.error("hibernate flush failed", e);
+        if (isStaleStateException(e)) {
+          log.info("problem flushing session: optimistic locking failed");
+          StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
+              "optimistic locking failed while flushing " + session, e);
+        }
+        else {
+          log.error("hibernate flush failed", e);
+        }
         return e;
       }
     }
@@ -504,5 +507,19 @@
     this.isTransactionEnabled = isTransactionEnabled;
   }
 
+  public static boolean isPersistenceException(Exception exception) {
+    for (Throwable t = exception; t != null; t = t.getCause()) {
+      if (t instanceof HibernateException) return true;
+    }
+    return false;
+  }
+
+  public static boolean isStaleStateException(Exception exception) {
+    for (Throwable t = exception; t != null; t = t.getCause()) {
+      if (t instanceof StaleStateException) return true;
+    }
+    return false;
+  }
+
   private static Log log = LogFactory.getLog(DbPersistenceService.class);
 }

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -32,7 +32,6 @@
 import org.jbpm.JbpmException;
 import org.jbpm.persistence.db.DbPersistenceService;
 import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-import org.jbpm.svc.Services;
 
 public class JtaDbPersistenceService extends DbPersistenceService {
 
@@ -93,8 +92,8 @@
       return null;
     }
     catch (Exception e) {
-      if (Services.isCausedByStaleState(e)) {
-        log.info("optimistic locking failed, could not commit " + transaction);
+      if (isStaleStateException(e)) {
+        log.debug("optimistic locking failed, could not commit " + transaction);
         StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
             "optimistic locking failed, could not commit " + transaction, e);
       }

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/svc/Services.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/svc/Services.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/svc/Services.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -30,7 +30,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.hibernate.StaleStateException;
 import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
 import org.jbpm.graph.exe.ProcessInstance;
@@ -38,6 +37,7 @@
 import org.jbpm.msg.MessageService;
 import org.jbpm.persistence.JbpmPersistenceException;
 import org.jbpm.persistence.PersistenceService;
+import org.jbpm.persistence.db.DbPersistenceService;
 import org.jbpm.persistence.db.StaleObjectLogConfigurer;
 import org.jbpm.scheduler.SchedulerService;
 import org.jbpm.security.AuthenticationService;
@@ -243,9 +243,8 @@
             service.close();
           }
           catch (JbpmPersistenceException e) {
-            // if this is a stale state exception, the jbpm configuration has control over the
-            // logging
-            if (isCausedByStaleState(e)) {
+            // if this is a stale state exception, keep it quiet
+            if (DbPersistenceService.isStaleStateException(e)) {
               log.info("optimistic locking failed, could not close service: " + serviceName);
               StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
                   "optimistic locking failed, could not close service: " + serviceName, e);
@@ -274,13 +273,6 @@
     }
   }
 
-  public static boolean isCausedByStaleState(Exception exception) {
-    for (Throwable cause = exception.getCause(); cause != null; cause = cause.getCause()) {
-      if (cause instanceof StaleStateException) return true;
-    }
-    return false;
-  }
-
   public static void assignId(Object object) {
     JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
     if (jbpmContext != null) {

Modified: jbpm3/trunk/modules/core/src/test/java/org/jbpm/db/JobSessionDbTest.java
===================================================================
--- jbpm3/trunk/modules/core/src/test/java/org/jbpm/db/JobSessionDbTest.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/core/src/test/java/org/jbpm/db/JobSessionDbTest.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -36,18 +36,18 @@
 
   public static class FailingAction implements ActionHandler {
     private static final long serialVersionUID = 1L;
-    public void execute(ExecutionContext executionContext) throws Exception
-    {
+    
+    public void execute(ExecutionContext executionContext) throws Exception {
       throw new RuntimeException("TEST-EXCEPTION");
     }    
   }
 
   /**
    * Test case which generates a {@link Job} ({@link ExecuteNodeJob} via async=true)
-   * which causes an excpetion. Afterwards it is checked if this job 
+   * which causes an exception. Afterwards it is checked if this job 
    * is found by the getFailedJobs method
    */
-  public void notestLoadFailedJobs() throws Exception {
+  public void testLoadFailedJobs() throws Exception {
     String xml = 
         "<process-definition name='TestJob'>"
       + " <start-state>"
@@ -64,14 +64,14 @@
     // create a process definition
     ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(xml);
     // save it in the database
-    graphSession.saveProcessDefinition(processDefinition);
+    graphSession.deployProcessDefinition(processDefinition);
     
     newTransaction();
-    
     try {      
       // start a new process instance and signal it
       ProcessInstance pi = jbpmContext.newProcessInstance("TestJob");
       pi.getRootToken().signal();
+      jbpmContext.save(pi);
       
       newTransaction();
       
@@ -81,15 +81,23 @@
 
       // start job executor wait for job to be executed
       // and failure is written to database
-      startJobExecutor();
-      long startTime = System.currentTimeMillis();
-      while( jobSession.findFailedJobs().size() <= 0 ) {
-        if (System.currentTimeMillis() - startTime > timeout) {
-          fail("test execution exceeded treshold of " + timeout + " milliseconds");
+      commitAndCloseSession();
+      try {
+        startJobExecutor();
+
+        long startTime = System.currentTimeMillis();
+        while( getFailedJobCount() <= 0 ) {
+          if (System.currentTimeMillis() - startTime > timeout) {
+            fail("test execution exceeded threshold of " + timeout + " milliseconds");
+          }
+          Thread.sleep(500);
         }
-        Thread.sleep(500);
       }
-      stopJobExecutor();
+      finally {
+        stopJobExecutor();
+        beginSessionTransaction();
+      }
+      
       List<Job> failedJobs = jobSession.findFailedJobs();
 
       // now the one job we have should be failed
@@ -97,16 +105,28 @@
       assertEquals(1, failedJobs.size());
       
       // and information is set on the job
-      assertEquals(0, failedJobs.get(0).getRetries());
-      assertNotNull(failedJobs.get(0).getException() + "==null", 
-          failedJobs.get(0).getException());
-      assertTrue(failedJobs.get(0).getException() + " contains TEST-EXCEPTION", 
-          failedJobs.get(0).getException().indexOf("TEST-EXCEPTION")>0);    
+      Job job = failedJobs.get(0);
+      assertEquals(0, job.getRetries());
+
+      String exception = job.getException();
+      assertNotNull("expected job.exception not to be null", exception);
+      assertTrue("expected job.exception to contain TEST-EXCEPTION",
+          exception.contains("TEST-EXCEPTION"));    
     }
     finally {
-      newTransaction();
       // cleanup
       jbpmContext.getGraphSession().deleteProcessDefinition(processDefinition.getId());
     }
   }
+
+  int getFailedJobCount() {
+    beginSessionTransaction();
+    try {
+      return jobSession.findFailedJobs().size();
+    }
+    finally {
+      commitAndCloseSession();
+    }
+  }
+
 }
\ No newline at end of file

Modified: jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobCommand.java
===================================================================
--- jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobCommand.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobCommand.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -23,27 +23,19 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.List;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.NamingException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jbpm.JbpmContext;
+import org.jbpm.JbpmException;
 import org.jbpm.command.Command;
-import org.jbpm.db.JobSession;
+import org.jbpm.graph.exe.ProcessInstance;
 import org.jbpm.job.Job;
-import org.jbpm.msg.jms.JmsMessageService;
-import org.jbpm.msg.jms.JmsMessageServiceFactory;
-import org.jbpm.svc.Services;
+import org.jbpm.persistence.db.DbPersistenceService;
 
 /**
  * Individual job processing command.
+ * 
  * @author Alejandro Guizar
  */
 public class ExecuteJobCommand implements Command {
@@ -57,83 +49,50 @@
   }
 
   public Object execute(JbpmContext jbpmContext) throws Exception {
-    JobSession jobSession = jbpmContext.getJobSession();
-    Job job = jobSession.getJob(jobId);
-    if (job == null) {
-      log.debug("job " + jobId + " was deleted");
-      return null;
-    }
-    String lockOwner = job.getLockOwner();
-    if (lockOwner != null) {
-      log.debug(job + " is locked by " + lockOwner);
-      return null;
-    }
-    lockOwner = Long.toString(jobId);
+    Job job = acquireJob(jbpmContext);
+    executeJob(job, jbpmContext);
+    return job;
+  }
+
+  private Job acquireJob(JbpmContext jbpmContext) {
+    Job job = jbpmContext.getJobSession().loadJob(jobId);
+
+    // register process instance for automatic save
+    // see https://jira.jboss.org/jira/browse/JBPM-1015
+    ProcessInstance processInstance = job.getProcessInstance();
+    jbpmContext.addAutoSaveProcessInstance(processInstance);
+
+    // if job is exclusive, lock process instance
     if (job.isExclusive()) {
-      List<Job> exclusiveJobs = jobSession.findExclusiveJobs(lockOwner, job.getProcessInstance());
-      // lock exclusive jobs
-      if (exclusiveJobs.isEmpty()) {
-        // may happen if isolation level is below repeatable read
-        log.debug(job + " was locked during attempt to lock other jobs");
-        return null;
-      }
-      long[] exclusiveJobIds = new long[exclusiveJobs.size()];
-      int i = 0;
-      for (Job exclusiveJob : exclusiveJobs) {
-        exclusiveJob.setLockOwner(lockOwner);
-        exclusiveJobIds[i++] = exclusiveJob.getId();
-      }
-      log.debug("locking jobs " + exclusiveJobIds);
-      // execute exclusive jobs in separate transaction
-      postJobsExecution(jbpmContext, exclusiveJobIds);        
+      jbpmContext.getGraphSession().lockProcessInstance(processInstance);
     }
-    else {
-      // lock job to prevent others from deleting it
-      job.setLockOwner(lockOwner);
-      log.debug("executing " + job);
-      executeJob(job, jbpmContext);
-    }
-    return null;
+
+    // mark job as locked to prevent other parts of the engine from deleting it
+    job.setLockOwner(toString());
+    return job;
   }
 
   static void executeJob(Job job, JbpmContext jbpmContext) {
+    log.debug("executing " + job);
     try {
       if (job.execute(jbpmContext)) {
         jbpmContext.getJobSession().deleteJob(job);
       }
     }
-    catch (RuntimeException e) {
-      // nothing to do but clean up and exit
-      throw e;
-    }
     catch (Exception e) {
-      // save data about recoverable error condition
-      log.error("exception while executing " + job, e);
-      StringWriter memoryWriter = new StringWriter();
-      e.printStackTrace(new PrintWriter(memoryWriter));
-      job.setException(memoryWriter.toString());
-      job.setRetries(job.getRetries() - 1);
+      log.debug("exception while executing " + job, e);
+      if (!DbPersistenceService.isPersistenceException(e)) {
+        StringWriter memoryWriter = new StringWriter();
+        e.printStackTrace(new PrintWriter(memoryWriter));
+        job.setException(memoryWriter.toString());
+      }
+      else {
+        // allowing a transaction to proceed after a persistence exception is unsafe
+        throw e instanceof RuntimeException ? (RuntimeException) e :
+        	new JbpmException("failed to execute " + job, e);
+      }
     }
   }
 
-  private static void postJobsExecution(JbpmContext jbpmContext, long[] exclusiveJobIds)
-      throws NamingException, JMSException {
-    Services services = jbpmContext.getServices();
-    JmsMessageServiceFactory messageServiceFactory = (JmsMessageServiceFactory) services.getServiceFactory(Services.SERVICENAME_MESSAGE);
-    Destination destination = messageServiceFactory.getCommandDestination();
-
-    JmsMessageService messageService = (JmsMessageService) services.getMessageService();
-    Session session = messageService.getSession();
-    MessageProducer producer = session.createProducer(destination);
-    try {
-      Command command = new ExecuteJobsCommand(exclusiveJobIds);
-      Message message = session.createObjectMessage(command);
-      producer.send(message);
-    }
-    finally {
-      producer.close();
-    }
-  }
-
   private static Log log = LogFactory.getLog(ExecuteJobCommand.class);
 }

Deleted: jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobsCommand.java
===================================================================
--- jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobsCommand.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/enterprise/src/main/java/org/jbpm/ejb/impl/ExecuteJobsCommand.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.ejb.impl;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jbpm.JbpmContext;
-import org.jbpm.command.Command;
-import org.jbpm.job.Job;
-
-/**
- * Batch job processing command.
- * @author Alejandro Guizar
- */
-public class ExecuteJobsCommand implements Command {
-
-  private long[] jobIds;
-
-  private static final long serialVersionUID = 1L;
-  private static Log log = LogFactory.getLog(ExecuteJobsCommand.class);
-
-  public ExecuteJobsCommand(long[] jobIds) {
-    this.jobIds = jobIds;
-  }
-
-  public Object execute(JbpmContext jbpmContext) throws Exception {
-    log.debug("executing jobs " + Arrays.toString(jobIds));
-    List<Job> jobs = jbpmContext.getJobSession().loadJobs(jobIds);
-    for (Job job : jobs) {
-      job.execute(jbpmContext);
-    }
-    return null;
-  }
-
-}

Modified: jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/AbstractEnterpriseTestCase.java
===================================================================
--- jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/AbstractEnterpriseTestCase.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/AbstractEnterpriseTestCase.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -46,6 +46,7 @@
 import org.jbpm.command.SignalCommand;
 import org.jbpm.command.StartProcessInstanceCommand;
 import org.jbpm.ejb.LocalCommandServiceHome;
+import org.jbpm.graph.def.Event;
 import org.jbpm.graph.def.EventCallback;
 import org.jbpm.graph.def.ProcessDefinition;
 import org.jbpm.graph.exe.ProcessInstance;
@@ -63,6 +64,7 @@
   private static Destination commandQueue;
   private static ConnectionFactory jmsConnectionFactory;
 
+  private static final int MAX_WAIT_TIME = 30 * 1000;
   private static final Log log = LogFactory.getLog(AbstractEnterpriseTestCase.class);
 
   protected AbstractEnterpriseTestCase() {
@@ -121,10 +123,19 @@
     return processInstance.hasEnded();
   }
 
+  protected boolean waitForProcessInstanceEnd(long processInstanceId) {
+    long startTime = System.currentTimeMillis();
+    do {
+      EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, 1000);
+      if (System.currentTimeMillis() - startTime > MAX_WAIT_TIME) return false;
+    } while (!hasProcessInstanceEnded(processInstanceId));
+    return true;
+  }
+
   protected Object getVariable(final long processInstanceId, final String variableName) {
     return commandService.execute(new Command() {
       private static final long serialVersionUID = 1L;
-  
+
       public Object execute(JbpmContext jbpmContext) throws Exception {
         ProcessInstance processInstance = jbpmContext.loadProcessInstance(processInstanceId);
         return processInstance.getContextInstance().getVariable(variableName);

Added: jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jbpm1952/JBPM1952Test.java
===================================================================
--- jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jbpm1952/JBPM1952Test.java	                        (rev 0)
+++ jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jbpm1952/JBPM1952Test.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -0,0 +1,72 @@
+package org.jbpm.enterprise.jbpm1952;
+
+import junit.framework.Test;
+
+import org.jboss.bpm.api.test.IntegrationTestSetup;
+import org.jbpm.enterprise.AbstractEnterpriseTestCase;
+
+/**
+ * Use JMS instead of DBMS for storing jobs, so that each job is not taken by multiple job executor
+ * threads.
+ * 
+ * @see <a href="https://jira.jboss.org/jira/browse/JBPM-1952">JBPM-1952</a>
+ * @author Alejandro Guizar
+ */
+public class JBPM1952Test extends AbstractEnterpriseTestCase {
+
+  private static final int PROCESS_EXECUTION_COUNT = 20;
+
+  public static Test suite() {
+    return new IntegrationTestSetup(JBPM1952Test.class, "enterprise-test.war");
+  }
+
+  public void testStaleStateInAsyncFork() {
+    deployProcessDefinition("<process-definition name='jbpm1952'>"
+        + "  <event type='process-end'>"
+        + "    <action expression='#{eventCallback.processEnd}' />"
+        + "  </event>"
+        + "  <start-state>"
+        + "    <transition to='a' />"
+        + "  </start-state>"
+        + "  <node name='a' async='true'>"
+        + "    <transition to='b' />"
+        + "  </node>"
+        + "  <node name='b' async='true'>"
+        + "    <transition to='fork' />"
+        + "  </node>"
+        + "  <fork name='fork'>"
+        + "    <transition to='c1' name='to_c1'/>"
+        + "    <transition to='c2' name='to_c2'/>"
+        + "    <transition to='c3' name='to_c3'/>"
+        + "  </fork>"
+        + "  <node name='c1' async='true'>"
+        + "    <transition to='join' />"
+        + "  </node>"
+        + "  <node name='c2' async='true'>"
+        + "    <transition to='join' />"
+        + "  </node>"
+        + "  <node name='c3' async='true'>"
+        + "    <transition to='join' />"
+        + "  </node>"
+        + "  <join name='join' async='exclusive'>"
+        + "    <transition to='d' />"
+        + "  </join>"
+        + "  <node name='d' async='true'>"
+        + "    <transition to='end' />"
+        + "  </node>"
+        + "  <end-state name='end'/>"
+        + "</process-definition>");
+
+    long[] processInstanceIds = new long[PROCESS_EXECUTION_COUNT];
+    for (int i = 0; i < PROCESS_EXECUTION_COUNT; i++) {
+      processInstanceIds[i] = startProcessInstance("jbpm1952").getId();
+    }
+
+    for (int i = 0; i < PROCESS_EXECUTION_COUNT; i++) {
+      long processInstanceId = processInstanceIds[i];
+      assertTrue("expected process instance " + processInstanceId + " to have ended",
+          waitForProcessInstanceEnd(processInstanceId));
+    }
+  }
+
+}

Modified: jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jms/JmsMessageTest.java
===================================================================
--- jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jms/JmsMessageTest.java	2009-02-27 22:08:53 UTC (rev 4072)
+++ jbpm3/trunk/modules/enterprise/src/test/java/org/jbpm/enterprise/jms/JmsMessageTest.java	2009-02-28 01:05:43 UTC (rev 4073)
@@ -36,8 +36,7 @@
  */
 public class JmsMessageTest extends AbstractEnterpriseTestCase {
 
-  private static final int maxWaitTime = 10 * 1000;
-  private static final int processExecutionCount = 5;
+  private static final int PROCESS_INSTANCE_COUNT = 5;
 
   public static Test suite() throws Exception {
     return new IntegrationTestSetup(JmsMessageTest.class, "enterprise-test.war");
@@ -201,29 +200,18 @@
         + "  <end-state name='end' />"
         + "</process-definition>");
 
-    long[] processInstanceIds = new long[processExecutionCount];
-    for (int i = 0; i < processExecutionCount; i++) {
+    long[] processInstanceIds = new long[PROCESS_INSTANCE_COUNT];
+    for (int i = 0; i < PROCESS_INSTANCE_COUNT; i++) {
       processInstanceIds[i] = startProcessInstance("execution").getId();
       EventCallback.waitForEvent(Event.EVENTTYPE_NODE_ENTER);
     }
-    for (int i = 0; i < processExecutionCount; i++) {
+    for (int i = 0; i < PROCESS_INSTANCE_COUNT; i++) {
       EventCallback.waitForEvent(Event.EVENTTYPE_NODE_LEAVE);
     }
-    for (int i = 0; i < processExecutionCount; i++) {
+    for (int i = 0; i < PROCESS_INSTANCE_COUNT; i++) {
       long processInstanceId = processInstanceIds[i];
       assertTrue("expected process instance " + processInstanceId + " to have ended",
           waitForProcessInstanceEnd(processInstanceId));
     }
   }
-
-  private boolean waitForProcessInstanceEnd(long processInstanceId) {
-    long startTime = System.currentTimeMillis();
-    do {
-      EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, 1000);
-      if (System.currentTimeMillis() - startTime > maxWaitTime) {
-        return false;
-      }
-    } while (!hasProcessInstanceEnded(processInstanceId));
-    return true;
-  }
 }




More information about the jbpm-commits mailing list