[jbpm-commits] JBoss JBPM SVN: r4126 - in jbpm3/branches/jbpm-3.2.5.SP/modules/core/src: main/java/org/jbpm/graph/exe and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Mar 3 18:19:49 EST 2009


Author: alex.guizar at jboss.com
Date: 2009-03-03 18:19:49 -0500 (Tue, 03 Mar 2009)
New Revision: 4126

Modified:
   jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java
   jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java
   jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java
   jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
   jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java
   jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java
Log:
JBPM-2043: prevent cleanup job from being created if there are no deletable jobs

Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java	2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java	2009-03-03 23:19:49 UTC (rev 4126)
@@ -217,20 +217,21 @@
     }
   }
 
+  public int countDeletableJobsForProcessInstance(ProcessInstance processInstance) {
+    Number jobCount = (Number) session.getNamedQuery("JobSession.countDeletableJobsForProcessInstance")
+        .setParameter("processInstance", processInstance)
+        .uniqueResult();
+    return jobCount.intValue();
+  }
+
   public void deleteJobsForProcessInstance(ProcessInstance processInstance) {
     try {
-      // delete node execution jobs
-      int entityCount = session.getNamedQuery("JobSession.deleteExecuteNodeJobsForProcessInstance")
+      // delete unowned node-execute-jobs and timers
+      int entityCount = session.getNamedQuery("JobSession.deleteJobsForProcessInstance")
           .setParameter("processInstance", processInstance)
           .executeUpdate();
-      log.debug("deleted " + entityCount + " execute-node-jobs for " + processInstance);
+      log.debug("deleted " + entityCount + " jobs for " + processInstance);
 
-      // delete unowned timers
-      entityCount = session.getNamedQuery("JobSession.deleteTimersForProcessInstance")
-          .setParameter("processInstance", processInstance)
-          .executeUpdate();
-      log.debug("deleted " + entityCount + " timers for " + processInstance);
-
       // prevent further repetitions
       List<?> timers = session.getNamedQuery("JobSession.findRepeatingTimersForProcessInstance")
           .setParameter("processInstance", processInstance)

Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java	2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java	2009-03-03 23:19:49 UTC (rev 4126)
@@ -30,8 +30,10 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
 import org.jbpm.context.exe.ContextInstance;
+import org.jbpm.db.JobSession;
 import org.jbpm.graph.def.Event;
 import org.jbpm.graph.def.Identifiable;
 import org.jbpm.graph.def.Node;
@@ -45,6 +47,7 @@
 import org.jbpm.module.def.ModuleDefinition;
 import org.jbpm.module.exe.ModuleInstance;
 import org.jbpm.msg.MessageService;
+import org.jbpm.persistence.PersistenceService;
 import org.jbpm.svc.Services;
 import org.jbpm.taskmgmt.exe.TaskMgmtInstance;
 import org.jbpm.util.Clock;
@@ -363,15 +366,22 @@
         superProcessToken.signal(superExecutionContext);
       }
 
-      // make sure all the timers for this process instance are canceled after the process end updates are posted to the
-      // database
-      // NOTE Only timers should be deleted, messages should be kept.
-      MessageService messageService = (MessageService)Services.getCurrentService(Services.SERVICENAME_MESSAGE, false);
-      if (messageService != null)
+      // make sure jobs for this process instance are canceled 
+      // after the process end updates are posted to the database
+      JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
+      if (jbpmContext != null)
       {
-        CleanUpProcessJob job = new CleanUpProcessJob(this);
-        job.setDueDate(new Date());
-        messageService.send(job);
+        Services services = jbpmContext.getServices();
+        MessageService messageService = services.getMessageService();
+        PersistenceService persistenceService = services.getPersistenceService();
+        if (messageService != null
+            && persistenceService != null
+            && persistenceService.getJobSession().countDeletableJobsForProcessInstance(this) > 0)
+        {
+          CleanUpProcessJob job = new CleanUpProcessJob(rootToken);
+          job.setDueDate(new Date());
+          messageService.send(job);
+        }
       }
     }
   }

Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java	2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java	2009-03-03 23:19:49 UTC (rev 4126)
@@ -22,7 +22,7 @@
 package org.jbpm.job;
 
 import org.jbpm.JbpmContext;
-import org.jbpm.graph.exe.ProcessInstance;
+import org.jbpm.graph.exe.Token;
 import org.jbpm.scheduler.SchedulerService;
 
 /**
@@ -36,8 +36,8 @@
     // default constructor
   }
 
-  public CleanUpProcessJob(ProcessInstance processInstance) {
-    this.processInstance = processInstance;
+  public CleanUpProcessJob(Token token) {
+    super(token);
   }
 
   public boolean execute(JbpmContext jbpmContext) throws Exception {

Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2009-03-03 23:19:49 UTC (rev 4126)
@@ -345,14 +345,25 @@
     ]]>
   </query>
 
-  <query name="JobSession.deleteTimersForProcessInstance">
+  <query name="JobSession.countDeletableJobsForProcessInstance">
     <![CDATA[
-      delete from org.jbpm.job.Timer timer
-      where timer.processInstance = :processInstance
-        and timer.lockOwner is null
+      select count(job.id)
+      from org.jbpm.job.Job job
+      where job.processInstance = :processInstance
+        and job.lockOwner is null
+        and job.class in (org.jbpm.job.ExecuteNodeJob, org.jbpm.job.Timer)
     ]]>
   </query>
 
+  <query name="JobSession.deleteJobsForProcessInstance">
+    <![CDATA[
+      delete from org.jbpm.job.Job job
+      where job.processInstance = :processInstance
+        and job.lockOwner is null
+        and job.class in (org.jbpm.job.ExecuteNodeJob, org.jbpm.job.Timer)
+    ]]>
+  </query>
+
   <query name="JobSession.findRepeatingTimersForProcessInstance">
     <![CDATA[
       select timer
@@ -362,14 +373,6 @@
     ]]>
   </query>
 
-  <query name="JobSession.deleteExecuteNodeJobsForProcessInstance">
-    <![CDATA[
-      delete from org.jbpm.job.ExecuteNodeJob job
-      where job.processInstance = :processInstance
-        and job.lockOwner is null
-    ]]>
-  </query>
-
   <query name="JobSession.findJobsByToken">
     <![CDATA[
       select job

Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java	2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java	2009-03-03 23:19:49 UTC (rev 4126)
@@ -162,7 +162,7 @@
       processInstance.signal();
       jbpmContext.save(processInstance);
       assertEquals(processDefinition.getNode("end"), processInstance.getRootToken().getNode());
-      assertEquals(7, getNbrOfJobsAvailable());
+      assertEquals(6, getNbrOfJobsAvailable());
       assertEquals(0, recordedActionNumbers.size());
 
       processJobs(5000);

Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java	2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java	2009-03-03 23:19:49 UTC (rev 4126)
@@ -21,146 +21,105 @@
  */
 package org.jbpm.perf;
 
-// $Id$
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jbpm.JbpmContext;
-import org.jbpm.JbpmException;
-import org.jbpm.command.CommandService;
-import org.jbpm.command.NewProcessInstanceCommand;
-import org.jbpm.command.SignalCommand;
-import org.jbpm.command.impl.CommandServiceImpl;
+import java.util.concurrent.Semaphore;
+
 import org.jbpm.db.AbstractDbTestCase;
-import org.jbpm.graph.def.Action;
 import org.jbpm.graph.def.ActionHandler;
 import org.jbpm.graph.def.ProcessDefinition;
 import org.jbpm.graph.exe.ExecutionContext;
 import org.jbpm.graph.exe.ProcessInstance;
-import org.jbpm.graph.exe.Token;
-import org.jbpm.instantiation.Delegation;
-import org.jbpm.job.ExecuteActionJob;
-import org.jbpm.msg.MessageService;
-import org.jbpm.svc.Services;
 
 /**
- * This tests creates a number of process instances. 
- * Every instance has a call to an ActionHandler. 
- * 
+ * This tests creates a number of process instances. Every instance has a call to an ActionHandler.
  * https://jira.jboss.org/jira/browse/JBPM-2043
- *
+ * 
  * @author mvecera at redhat.com
  * @author pmacik at redhat.com
  * @author thomas.diesler at jboss.com
  * @since 18-Feb-2009
  */
 public class SimplePerformanceTest extends AbstractDbTestCase {
-   private static final Log log = LogFactory.getLog(SimplePerformanceTest.class);
-   private static final String ESB_ASYNC_SIGNAL_ACTION_NAME = "ESB_ASYNC_SIGNAL_ACTION";
-   private CommandService commandService = new CommandServiceImpl(getJbpmConfiguration());
-   private static final int INSTANCES = 10000;
-   private static int count = 1;
-   private static AtomicInteger signaled = new AtomicInteger(0);
-   private Action signalAction;
-   ProcessDefinition processDefinition;
 
-   @Override
-   public void setUp() throws Exception {
-      super.setUp();
+  private static final int WARMUP_INSTANCES = 100;
+  private static final int MEASUREMENT_INSTANCES = 1000;
 
-      processDefinition = ProcessDefinition.parseXmlString(
-              "<process-definition xmlns='urn:jbpm.org:jpdl-3.1' name='processDefinition1'>" +
-              "  <start-state name='start'>" +
-              "    <transition name='to_state' to='end'>" +
-              "      <action class='" + PerfActionHandler.class.getName() + "'/>" +
-              "    </transition>" +
-              "  </start-state>" +
-              "  <end-state name='end'/>" +
-              "</process-definition>");
+  private static final Semaphore signalLight = new Semaphore(0);
 
-         final Delegation delegation = new Delegation(AsyncSignalAction.class.getName());
-         delegation.setConfigType("constructor");
-         signalAction = new Action(delegation);
-         signalAction.setName(ESB_ASYNC_SIGNAL_ACTION_NAME);
-         processDefinition.addAction(signalAction);
+  private ProcessDefinition processDefinition;
 
-         saveAndReload(processDefinition);
-   }
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
 
-   @Override
-   public void tearDown() throws Exception {
-      beginSessionTransaction();
-      jbpmContext.getGraphSession().deleteProcessDefinition(processDefinition.getId());
-      super.tearDown();
-   }
+    processDefinition = ProcessDefinition.parseXmlString("<process-definition name='perf'>"
+        + "  <event type='process-start'>"
+        + "    <action class='"
+        + AsyncSignalAction.class.getName()
+        + "' async='true'/>"
+        + "  </event>"
+        + "  <start-state name='start'>"
+        + "    <transition to='end'/>"
+        + "  </start-state>"
+        + "  <end-state name='end'/>"
+        + "</process-definition>");
+    jbpmContext.deployProcessDefinition(processDefinition);
 
-   public void testAsyncCall() {
-      long start = System.currentTimeMillis();
+    newTransaction();
+    startJobExecutor();
+  }
 
-      startJobExecutor();
+  @Override
+  public void tearDown() throws Exception {
+    stopJobExecutor();
 
-      commitAndCloseSession();
+    newTransaction();
+    jbpmContext.getGraphSession().deleteProcessDefinition(processDefinition.getId());
 
-      for (int i = 0; i < INSTANCES; i++) {
-         beginSessionTransaction();
-         NewProcessInstanceCommand startCommand = new NewProcessInstanceCommand();
-         startCommand.setProcessDefinitionId(processDefinition.getId());
-         startCommand.setProcessDefinitionName("processDefinition1");
-         ProcessInstance pi = (ProcessInstance) commandService.execute(startCommand);
+    super.tearDown();
+  }
 
-         Token token = pi.getRootToken();        
-         final ExecuteActionJob signalJob = new ExecuteActionJob(token);
+  public void testAsyncCall() {
+    launchProcessInstances(WARMUP_INSTANCES);
 
-         signalJob.setAction(signalAction);
-         signalJob.setDueDate(new Date());
-         signalJob.setSuspended(token.isSuspended());
+    long startTime = System.currentTimeMillis();
+    launchProcessInstances(MEASUREMENT_INSTANCES);
+    long duration = System.currentTimeMillis() - startTime;
 
-         final MessageService messageService = (MessageService) Services.getCurrentService(Services.SERVICENAME_MESSAGE, true);
-         messageService.send(signalJob);
+    System.out.println("=== Test finished processing "
+        + MEASUREMENT_INSTANCES
+        + " instances in "
+        + duration
+        + "ms ===");
+    System.out.println("=== This is "
+        + Math.round(1000f * MEASUREMENT_INSTANCES / duration)
+        + " instances per second ===");
+  }
 
-         commitAndCloseSession();
-      }
+  private void launchProcessInstances(int count) {
+    for (int i = 0; i < count; i++) {
+      ProcessInstance pi = new ProcessInstance(processDefinition);
+      jbpmContext.save(pi);
+      newTransaction();
+    }
 
-      while (signaled.get() < INSTANCES) {
-         try {
-            Thread.sleep(1000);
-         } catch (InterruptedException e) {
-            // no problem, computer's fault
-         }
-      }
+    commitAndCloseSession();
+    try {
+      signalLight.acquire(count);
+    }
+    catch (InterruptedException e) {
+      fail(getName() + " got interrupted while waiting for process instances to end");
+    }
+    finally {
+      beginSessionTransaction();
+    }
+  }
 
-      stopJobExecutor();
+  public static class AsyncSignalAction implements ActionHandler {
+    private static final long serialVersionUID = -8617329370138396271L;
 
-      long stop = System.currentTimeMillis();
-      System.out.println("=== Test finished processing " + INSTANCES + " instances in " + (stop - start) + "ms ===");
-      System.out.println("=== This is " + Math.round(1000f * INSTANCES / (stop - start)) + " instances per second ===");
-   }
-
-   public static class PerfActionHandler implements ActionHandler {
-      private static final long serialVersionUID = -2171981067863454024L;
-      public void execute(ExecutionContext executionContext) throws Exception {
-         //System.out.println(count);
-         count++;
-      }
-   }
-
-   private static class AsyncSignalAction implements ActionHandler {
-      private static final long serialVersionUID = -8617329370138396271L;
-
-      AsyncSignalAction(final String configuration) throws JbpmException {
-      }
-
-      public void execute(final ExecutionContext executionContext)
-              throws Exception {
-         final Token token = executionContext.getToken();
-         final long tokenId = token.getId();
-         final JbpmContext jbpmContext = executionContext.getJbpmContext();
-
-         final SignalCommand signalCommand = new SignalCommand(tokenId, null);
-         signalCommand.execute(jbpmContext);
-
-         signaled.incrementAndGet();
-      }
-   }
+    public void execute(final ExecutionContext executionContext) throws Exception {
+      executionContext.leaveNode();
+      signalLight.release();
+    }
+  }
 }




More information about the jbpm-commits mailing list