[jbpm-commits] JBoss JBPM SVN: r4126 - in jbpm3/branches/jbpm-3.2.5.SP/modules/core/src: main/java/org/jbpm/graph/exe and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Mar 3 18:19:49 EST 2009
Author: alex.guizar at jboss.com
Date: 2009-03-03 18:19:49 -0500 (Tue, 03 Mar 2009)
New Revision: 4126
Modified:
jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java
jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java
jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java
jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java
jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java
Log:
JBPM-2043: prevent cleanup job from being created if there are no deletable jobs
Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java 2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/db/JobSession.java 2009-03-03 23:19:49 UTC (rev 4126)
@@ -217,20 +217,21 @@
}
}
+ public int countDeletableJobsForProcessInstance(ProcessInstance processInstance) {
+ Number jobCount = (Number) session.getNamedQuery("JobSession.countDeletableJobsForProcessInstance")
+ .setParameter("processInstance", processInstance)
+ .uniqueResult();
+ return jobCount.intValue();
+ }
+
public void deleteJobsForProcessInstance(ProcessInstance processInstance) {
try {
- // delete node execution jobs
- int entityCount = session.getNamedQuery("JobSession.deleteExecuteNodeJobsForProcessInstance")
+ // delete unowned node-execute-jobs and timers
+ int entityCount = session.getNamedQuery("JobSession.deleteJobsForProcessInstance")
.setParameter("processInstance", processInstance)
.executeUpdate();
- log.debug("deleted " + entityCount + " execute-node-jobs for " + processInstance);
+ log.debug("deleted " + entityCount + " jobs for " + processInstance);
- // delete unowned timers
- entityCount = session.getNamedQuery("JobSession.deleteTimersForProcessInstance")
- .setParameter("processInstance", processInstance)
- .executeUpdate();
- log.debug("deleted " + entityCount + " timers for " + processInstance);
-
// prevent further repetitions
List<?> timers = session.getNamedQuery("JobSession.findRepeatingTimersForProcessInstance")
.setParameter("processInstance", processInstance)
Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java 2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/graph/exe/ProcessInstance.java 2009-03-03 23:19:49 UTC (rev 4126)
@@ -30,8 +30,10 @@
import java.util.List;
import java.util.Map;
+import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.context.exe.ContextInstance;
+import org.jbpm.db.JobSession;
import org.jbpm.graph.def.Event;
import org.jbpm.graph.def.Identifiable;
import org.jbpm.graph.def.Node;
@@ -45,6 +47,7 @@
import org.jbpm.module.def.ModuleDefinition;
import org.jbpm.module.exe.ModuleInstance;
import org.jbpm.msg.MessageService;
+import org.jbpm.persistence.PersistenceService;
import org.jbpm.svc.Services;
import org.jbpm.taskmgmt.exe.TaskMgmtInstance;
import org.jbpm.util.Clock;
@@ -363,15 +366,22 @@
superProcessToken.signal(superExecutionContext);
}
- // make sure all the timers for this process instance are canceled after the process end updates are posted to the
- // database
- // NOTE Only timers should be deleted, messages should be kept.
- MessageService messageService = (MessageService)Services.getCurrentService(Services.SERVICENAME_MESSAGE, false);
- if (messageService != null)
+ // make sure jobs for this process instance are canceled
+ // after the process end updates are posted to the database
+ JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
+ if (jbpmContext != null)
{
- CleanUpProcessJob job = new CleanUpProcessJob(this);
- job.setDueDate(new Date());
- messageService.send(job);
+ Services services = jbpmContext.getServices();
+ MessageService messageService = services.getMessageService();
+ PersistenceService persistenceService = services.getPersistenceService();
+ if (messageService != null
+ && persistenceService != null
+ && persistenceService.getJobSession().countDeletableJobsForProcessInstance(this) > 0)
+ {
+ CleanUpProcessJob job = new CleanUpProcessJob(rootToken);
+ job.setDueDate(new Date());
+ messageService.send(job);
+ }
}
}
}
Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java 2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/java/org/jbpm/job/CleanUpProcessJob.java 2009-03-03 23:19:49 UTC (rev 4126)
@@ -22,7 +22,7 @@
package org.jbpm.job;
import org.jbpm.JbpmContext;
-import org.jbpm.graph.exe.ProcessInstance;
+import org.jbpm.graph.exe.Token;
import org.jbpm.scheduler.SchedulerService;
/**
@@ -36,8 +36,8 @@
// default constructor
}
- public CleanUpProcessJob(ProcessInstance processInstance) {
- this.processInstance = processInstance;
+ public CleanUpProcessJob(Token token) {
+ super(token);
}
public boolean execute(JbpmContext jbpmContext) throws Exception {
Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2009-03-03 23:19:49 UTC (rev 4126)
@@ -345,14 +345,25 @@
]]>
</query>
- <query name="JobSession.deleteTimersForProcessInstance">
+ <query name="JobSession.countDeletableJobsForProcessInstance">
<![CDATA[
- delete from org.jbpm.job.Timer timer
- where timer.processInstance = :processInstance
- and timer.lockOwner is null
+ select count(job.id)
+ from org.jbpm.job.Job job
+ where job.processInstance = :processInstance
+ and job.lockOwner is null
+ and job.class in (org.jbpm.job.ExecuteNodeJob, org.jbpm.job.Timer)
]]>
</query>
+ <query name="JobSession.deleteJobsForProcessInstance">
+ <![CDATA[
+ delete from org.jbpm.job.Job job
+ where job.processInstance = :processInstance
+ and job.lockOwner is null
+ and job.class in (org.jbpm.job.ExecuteNodeJob, org.jbpm.job.Timer)
+ ]]>
+ </query>
+
<query name="JobSession.findRepeatingTimersForProcessInstance">
<![CDATA[
select timer
@@ -362,14 +373,6 @@
]]>
</query>
- <query name="JobSession.deleteExecuteNodeJobsForProcessInstance">
- <![CDATA[
- delete from org.jbpm.job.ExecuteNodeJob job
- where job.processInstance = :processInstance
- and job.lockOwner is null
- ]]>
- </query>
-
<query name="JobSession.findJobsByToken">
<![CDATA[
select job
Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java 2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/msg/command/AsyncExecutionDbTest.java 2009-03-03 23:19:49 UTC (rev 4126)
@@ -162,7 +162,7 @@
processInstance.signal();
jbpmContext.save(processInstance);
assertEquals(processDefinition.getNode("end"), processInstance.getRootToken().getNode());
- assertEquals(7, getNbrOfJobsAvailable());
+ assertEquals(6, getNbrOfJobsAvailable());
assertEquals(0, recordedActionNumbers.size());
processJobs(5000);
Modified: jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java
===================================================================
--- jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java 2009-03-03 19:52:32 UTC (rev 4125)
+++ jbpm3/branches/jbpm-3.2.5.SP/modules/core/src/test/java/org/jbpm/perf/SimplePerformanceTest.java 2009-03-03 23:19:49 UTC (rev 4126)
@@ -21,146 +21,105 @@
*/
package org.jbpm.perf;
-// $Id$
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jbpm.JbpmContext;
-import org.jbpm.JbpmException;
-import org.jbpm.command.CommandService;
-import org.jbpm.command.NewProcessInstanceCommand;
-import org.jbpm.command.SignalCommand;
-import org.jbpm.command.impl.CommandServiceImpl;
+import java.util.concurrent.Semaphore;
+
import org.jbpm.db.AbstractDbTestCase;
-import org.jbpm.graph.def.Action;
import org.jbpm.graph.def.ActionHandler;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.ExecutionContext;
import org.jbpm.graph.exe.ProcessInstance;
-import org.jbpm.graph.exe.Token;
-import org.jbpm.instantiation.Delegation;
-import org.jbpm.job.ExecuteActionJob;
-import org.jbpm.msg.MessageService;
-import org.jbpm.svc.Services;
/**
- * This tests creates a number of process instances.
- * Every instance has a call to an ActionHandler.
- *
+ * This tests creates a number of process instances. Every instance has a call to an ActionHandler.
* https://jira.jboss.org/jira/browse/JBPM-2043
- *
+ *
* @author mvecera at redhat.com
* @author pmacik at redhat.com
* @author thomas.diesler at jboss.com
* @since 18-Feb-2009
*/
public class SimplePerformanceTest extends AbstractDbTestCase {
- private static final Log log = LogFactory.getLog(SimplePerformanceTest.class);
- private static final String ESB_ASYNC_SIGNAL_ACTION_NAME = "ESB_ASYNC_SIGNAL_ACTION";
- private CommandService commandService = new CommandServiceImpl(getJbpmConfiguration());
- private static final int INSTANCES = 10000;
- private static int count = 1;
- private static AtomicInteger signaled = new AtomicInteger(0);
- private Action signalAction;
- ProcessDefinition processDefinition;
- @Override
- public void setUp() throws Exception {
- super.setUp();
+ private static final int WARMUP_INSTANCES = 100;
+ private static final int MEASUREMENT_INSTANCES = 1000;
- processDefinition = ProcessDefinition.parseXmlString(
- "<process-definition xmlns='urn:jbpm.org:jpdl-3.1' name='processDefinition1'>" +
- " <start-state name='start'>" +
- " <transition name='to_state' to='end'>" +
- " <action class='" + PerfActionHandler.class.getName() + "'/>" +
- " </transition>" +
- " </start-state>" +
- " <end-state name='end'/>" +
- "</process-definition>");
+ private static final Semaphore signalLight = new Semaphore(0);
- final Delegation delegation = new Delegation(AsyncSignalAction.class.getName());
- delegation.setConfigType("constructor");
- signalAction = new Action(delegation);
- signalAction.setName(ESB_ASYNC_SIGNAL_ACTION_NAME);
- processDefinition.addAction(signalAction);
+ private ProcessDefinition processDefinition;
- saveAndReload(processDefinition);
- }
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
- @Override
- public void tearDown() throws Exception {
- beginSessionTransaction();
- jbpmContext.getGraphSession().deleteProcessDefinition(processDefinition.getId());
- super.tearDown();
- }
+ processDefinition = ProcessDefinition.parseXmlString("<process-definition name='perf'>"
+ + " <event type='process-start'>"
+ + " <action class='"
+ + AsyncSignalAction.class.getName()
+ + "' async='true'/>"
+ + " </event>"
+ + " <start-state name='start'>"
+ + " <transition to='end'/>"
+ + " </start-state>"
+ + " <end-state name='end'/>"
+ + "</process-definition>");
+ jbpmContext.deployProcessDefinition(processDefinition);
- public void testAsyncCall() {
- long start = System.currentTimeMillis();
+ newTransaction();
+ startJobExecutor();
+ }
- startJobExecutor();
+ @Override
+ public void tearDown() throws Exception {
+ stopJobExecutor();
- commitAndCloseSession();
+ newTransaction();
+ jbpmContext.getGraphSession().deleteProcessDefinition(processDefinition.getId());
- for (int i = 0; i < INSTANCES; i++) {
- beginSessionTransaction();
- NewProcessInstanceCommand startCommand = new NewProcessInstanceCommand();
- startCommand.setProcessDefinitionId(processDefinition.getId());
- startCommand.setProcessDefinitionName("processDefinition1");
- ProcessInstance pi = (ProcessInstance) commandService.execute(startCommand);
+ super.tearDown();
+ }
- Token token = pi.getRootToken();
- final ExecuteActionJob signalJob = new ExecuteActionJob(token);
+ public void testAsyncCall() {
+ launchProcessInstances(WARMUP_INSTANCES);
- signalJob.setAction(signalAction);
- signalJob.setDueDate(new Date());
- signalJob.setSuspended(token.isSuspended());
+ long startTime = System.currentTimeMillis();
+ launchProcessInstances(MEASUREMENT_INSTANCES);
+ long duration = System.currentTimeMillis() - startTime;
- final MessageService messageService = (MessageService) Services.getCurrentService(Services.SERVICENAME_MESSAGE, true);
- messageService.send(signalJob);
+ System.out.println("=== Test finished processing "
+ + MEASUREMENT_INSTANCES
+ + " instances in "
+ + duration
+ + "ms ===");
+ System.out.println("=== This is "
+ + Math.round(1000f * MEASUREMENT_INSTANCES / duration)
+ + " instances per second ===");
+ }
- commitAndCloseSession();
- }
+ private void launchProcessInstances(int count) {
+ for (int i = 0; i < count; i++) {
+ ProcessInstance pi = new ProcessInstance(processDefinition);
+ jbpmContext.save(pi);
+ newTransaction();
+ }
- while (signaled.get() < INSTANCES) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // no problem, computer's fault
- }
- }
+ commitAndCloseSession();
+ try {
+ signalLight.acquire(count);
+ }
+ catch (InterruptedException e) {
+ fail(getName() + " got interrupted while waiting for process instances to end");
+ }
+ finally {
+ beginSessionTransaction();
+ }
+ }
- stopJobExecutor();
+ public static class AsyncSignalAction implements ActionHandler {
+ private static final long serialVersionUID = -8617329370138396271L;
- long stop = System.currentTimeMillis();
- System.out.println("=== Test finished processing " + INSTANCES + " instances in " + (stop - start) + "ms ===");
- System.out.println("=== This is " + Math.round(1000f * INSTANCES / (stop - start)) + " instances per second ===");
- }
-
- public static class PerfActionHandler implements ActionHandler {
- private static final long serialVersionUID = -2171981067863454024L;
- public void execute(ExecutionContext executionContext) throws Exception {
- //System.out.println(count);
- count++;
- }
- }
-
- private static class AsyncSignalAction implements ActionHandler {
- private static final long serialVersionUID = -8617329370138396271L;
-
- AsyncSignalAction(final String configuration) throws JbpmException {
- }
-
- public void execute(final ExecutionContext executionContext)
- throws Exception {
- final Token token = executionContext.getToken();
- final long tokenId = token.getId();
- final JbpmContext jbpmContext = executionContext.getJbpmContext();
-
- final SignalCommand signalCommand = new SignalCommand(tokenId, null);
- signalCommand.execute(jbpmContext);
-
- signaled.incrementAndGet();
- }
- }
+ public void execute(final ExecutionContext executionContext) throws Exception {
+ executionContext.leaveNode();
+ signalLight.release();
+ }
+ }
}
More information about the jbpm-commits
mailing list