[jbpm-commits] JBoss JBPM SVN: r5223 - in jbpm3/branches/jbpm-3.2-soa/modules/core/src: main/java/org/jbpm/job/executor and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat Jul 4 09:41:56 EDT 2009
Author: alex.guizar at jboss.com
Date: 2009-07-04 09:41:56 -0400 (Sat, 04 Jul 2009)
New Revision: 5223
Modified:
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/db/JobSession.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
Log:
[JBPM-2375] prevent job executor threads from committing after an Error is thrown
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/db/JobSession.java 2009-07-04 11:53:02 UTC (rev 5222)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/db/JobSession.java 2009-07-04 13:41:56 UTC (rev 5223)
@@ -26,8 +26,6 @@
import java.util.Iterator;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.hibernate.HibernateException;
import org.hibernate.LockMode;
import org.hibernate.Query;
@@ -119,7 +117,6 @@
public void deleteJob(Job job) {
try {
session.delete(job);
- log.debug("deleted " + job);
}
catch (HibernateException e) {
throw new JbpmPersistenceException("could not delete " + job, e);
@@ -189,19 +186,16 @@
public void deleteTimersByName(String name, Token token) {
try {
// delete unowned timers
- int entityCount =
- session.getNamedQuery("JobSession.deleteTimersByName")
- .setString("name", name)
- .setParameter("token", token)
- .executeUpdate();
- log.debug("deleted " + entityCount + " timers by name '" + name + "' for " + token);
+ session.getNamedQuery("JobSession.deleteTimersByName")
+ .setString("name", name)
+ .setParameter("token", token)
+ .executeUpdate();
// prevent further repetitions
- List timers =
- session.getNamedQuery("JobSession.findRepeatingTimersByName")
- .setString("name", name)
- .setParameter("token", token)
- .list();
+ List timers = session.getNamedQuery("JobSession.findRepeatingTimersByName")
+ .setString("name", name)
+ .setParameter("token", token)
+ .list();
preventFurtherRepetitions(timers);
}
catch (HibernateException e) {
@@ -213,27 +207,23 @@
}
public int countDeletableJobsForProcessInstance(ProcessInstance processInstance) {
- Number jobCount =
- (Number) session.getNamedQuery("JobSession.countDeletableJobsForProcessInstance")
- .setParameter("processInstance", processInstance)
- .uniqueResult();
+ Number jobCount = (Number) session.getNamedQuery("JobSession.countDeletableJobsForProcessInstance")
+ .setParameter("processInstance", processInstance)
+ .uniqueResult();
return jobCount.intValue();
}
public void deleteJobsForProcessInstance(ProcessInstance processInstance) {
try {
// delete unowned node-execute-jobs and timers
- int entityCount =
- session.getNamedQuery("JobSession.deleteJobsForProcessInstance")
- .setParameter("processInstance", processInstance)
- .executeUpdate();
- log.debug("deleted " + entityCount + " jobs for " + processInstance);
+ session.getNamedQuery("JobSession.deleteJobsForProcessInstance")
+ .setParameter("processInstance", processInstance)
+ .executeUpdate();
// prevent further repetitions
- List timers =
- session.getNamedQuery("JobSession.findRepeatingTimersForProcessInstance")
- .setParameter("processInstance", processInstance)
- .list();
+ List timers = session.getNamedQuery("JobSession.findRepeatingTimersForProcessInstance")
+ .setParameter("processInstance", processInstance)
+ .list();
preventFurtherRepetitions(timers);
}
catch (HibernateException e) {
@@ -247,7 +237,6 @@
Timer timer = (Timer) i.next();
timer.setRepeat(null);
}
- log.debug("prevented further repetitions of " + timers);
}
}
@@ -262,5 +251,4 @@
}
}
- private static Log log = LogFactory.getLog(JobSession.class);
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2009-07-04 11:53:02 UTC (rev 5222)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2009-07-04 13:41:56 UTC (rev 5223)
@@ -27,7 +27,6 @@
final int maxIdleInterval;
final long maxLockTime;
- int currentIdleInterval;
volatile boolean isActive = true;
public JobExecutorThread(String name, JobExecutor jobExecutor) {
@@ -55,18 +54,16 @@
}
public void run() {
- currentIdleInterval = idleInterval;
+ int currentIdleInterval = idleInterval;
while (isActive) {
try {
Collection acquiredJobs = acquireJobs();
- if (!acquiredJobs.isEmpty()) {
- for (Iterator i = acquiredJobs.iterator(); i.hasNext() && isActive;) {
- Job job = (Job) i.next();
- executeJob(job);
- }
+ for (Iterator i = acquiredJobs.iterator(); i.hasNext() && isActive;) {
+ Job job = (Job) i.next();
+ executeJob(job);
}
if (isActive) {
- long waitPeriod = getWaitPeriod();
+ long waitPeriod = getWaitPeriod(currentIdleInterval);
if (waitPeriod > 0) {
synchronized (jobExecutor) {
jobExecutor.wait(waitPeriod);
@@ -77,26 +74,26 @@
currentIdleInterval = idleInterval;
}
catch (RuntimeException e) {
- log.error("exception in job executor thread. waiting " +
- currentIdleInterval +
- " milliseconds", e);
- try {
- synchronized (jobExecutor) {
- jobExecutor.wait(currentIdleInterval);
+ if (isActive) {
+ log.error("exception in " + getName() + ", waiting " + currentIdleInterval + " ms", e);
+ try {
+ synchronized (jobExecutor) {
+ jobExecutor.wait(currentIdleInterval);
+ }
}
+ catch (InterruptedException ie) {
+ log.debug("delay after exception got interrupted", ie);
+ }
+ // after an exception, the current idle interval is doubled to prevent
+ // continuous exception generation when e.g. the db is unreachable
+ currentIdleInterval <<= 1;
+ if (currentIdleInterval > maxIdleInterval || currentIdleInterval < 0) {
+ currentIdleInterval = maxIdleInterval;
+ }
}
- catch (InterruptedException ie) {
- log.debug("delay after exception got interrupted", ie);
- }
- // after an exception, the current idle interval is doubled to prevent
- // continuous exception generation when e.g. the db is unreachable
- currentIdleInterval <<= 1;
- if (currentIdleInterval > maxIdleInterval || currentIdleInterval < 0) {
- currentIdleInterval = maxIdleInterval;
- }
}
catch (InterruptedException e) {
- log.info("job executor thread '" + getName() + "' got interrupted");
+ log.info(getName() + " got interrupted");
}
}
log.info(getName() + " leaves cyberspace");
@@ -104,28 +101,25 @@
protected Collection acquireJobs() {
Collection acquiredJobs;
+
synchronized (jobExecutor) {
- log.debug("acquiring jobs for execution...");
List jobsToLock = Collections.EMPTY_LIST;
+
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
JobSession jobSession = jbpmContext.getJobSession();
+
String lockOwner = getName();
- log.debug("querying for acquirable job...");
Job job = jobSession.getFirstAcquirableJob(lockOwner);
if (job != null) {
if (job.isExclusive()) {
- log.debug("obtained exclusive " + job);
ProcessInstance processInstance = job.getProcessInstance();
- log.debug("finding other exclusive jobs for " + processInstance);
+ log.debug("finding exclusive jobs for " + processInstance);
jobsToLock = jobSession.findExclusiveJobs(lockOwner, processInstance);
- log.debug("trying to obtain exclusive locks on " +
- jobsToLock +
- " for " +
- processInstance);
+ log.debug("acquiring " + jobsToLock + " for " + processInstance);
}
else {
- log.debug("trying to obtain lock on " + job);
+ log.debug("acquiring " + job);
jobsToLock = Collections.singletonList(job);
}
@@ -136,14 +130,15 @@
job.setLockTime(lockTime);
}
}
- else {
- log.debug("no acquirable jobs in job table");
- }
}
catch (RuntimeException e) {
jbpmContext.setRollbackOnly();
throw e;
}
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
finally {
try {
jbpmContext.close();
@@ -153,8 +148,8 @@
catch (RuntimeException e) {
if (!DbPersistenceService.isLockingException(e)) throw e;
// if this is a locking exception, keep it quiet
- StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
- "failed to acquire lock on jobs " + jobsToLock);
+ StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
+ .error("failed to acquire lock on jobs " + jobsToLock);
acquiredJobs = Collections.EMPTY_LIST;
}
}
@@ -191,6 +186,10 @@
jbpmContext.setRollbackOnly();
}
}
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
// if this job is locked too long
long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime();
@@ -205,29 +204,49 @@
catch (RuntimeException e) {
if (!DbPersistenceService.isLockingException(e)) throw e;
// if this is a locking exception, keep it quiet
- StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
- "failed to complete job " + job);
+ StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to complete job " +
+ job);
}
}
}
+ protected long getWaitPeriod(int currentIdleInterval) {
+ long waitPeriod = currentIdleInterval;
+
+ Date nextDueDate = getNextDueDate();
+ if (nextDueDate != null) {
+ long nextDueTime = nextDueDate.getTime();
+ long currentTime = System.currentTimeMillis();
+
+ if (nextDueTime < currentTime + currentIdleInterval) {
+ waitPeriod = nextDueTime - currentTime;
+ if (waitPeriod < 0) waitPeriod = 0;
+ }
+ }
+ return waitPeriod;
+ }
+
protected Date getNextDueDate() {
Date nextDueDate = null;
- String threadName = getName();
+ String lockOwner = getName();
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
JobSession jobSession = jbpmContext.getJobSession();
Collection jobIdsToIgnore = jobExecutor.getMonitoredJobIds();
- Job job = jobSession.getFirstDueJob(threadName, jobIdsToIgnore);
+ Job job = jobSession.getFirstDueJob(lockOwner, jobIdsToIgnore);
if (job != null) {
nextDueDate = job.getDueDate();
- jobExecutor.addMonitoredJobId(threadName, job.getId());
+ jobExecutor.addMonitoredJobId(lockOwner, job.getId());
}
}
catch (RuntimeException e) {
jbpmContext.setRollbackOnly();
throw e;
}
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
finally {
try {
jbpmContext.close();
@@ -235,30 +254,14 @@
catch (RuntimeException e) {
if (!DbPersistenceService.isLockingException(e)) throw e;
// if this is a locking exception, keep it quiet
- StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
- "failed to determine next due date for job executor thread " + threadName);
+ StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
+ .error("failed to determine next due date for job executor thread " + lockOwner);
nextDueDate = null;
}
}
return nextDueDate;
}
- protected long getWaitPeriod() {
- long interval = currentIdleInterval;
- Date nextDueDate = getNextDueDate();
- if (nextDueDate != null) {
- long currentTime = System.currentTimeMillis();
- long nextDueTime = nextDueDate.getTime();
- if (nextDueTime < currentTime + currentIdleInterval) {
- interval = nextDueTime - currentTime;
- }
- }
- if (interval < 0) {
- interval = 0;
- }
- return interval;
- }
-
/**
* @deprecated As of jBPM 3.2.3, replaced by {@link #deactivate()}
*/
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java 2009-07-04 11:53:02 UTC (rev 5222)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java 2009-07-04 13:41:56 UTC (rev 5223)
@@ -9,7 +9,6 @@
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
-import org.jbpm.db.JobSession;
import org.jbpm.job.Job;
import org.jbpm.persistence.db.DbPersistenceService;
import org.jbpm.persistence.db.StaleObjectLogConfigurer;
@@ -18,14 +17,13 @@
public static final String DEFAULT_NAME = "LockMonitorThread";
- JbpmConfiguration jbpmConfiguration;
- int lockMonitorInterval;
- int maxLockTime;
- int lockBufferTime;
+ final JbpmConfiguration jbpmConfiguration;
+ final int lockMonitorInterval;
+ final int maxLockTime;
+ final int lockBufferTime;
- int currentLockMonitorInterval;
volatile boolean isActive = true;
-
+
public LockMonitorThread(JobExecutor jobExecutor) {
this(DEFAULT_NAME, jobExecutor);
}
@@ -36,7 +34,7 @@
lockMonitorInterval = jobExecutor.getLockMonitorInterval();
maxLockTime = jobExecutor.getMaxLockTime();
lockBufferTime = jobExecutor.getLockBufferTime();
- }
+ }
/** @deprecated As of jBPM 3.2.6, replaced by {@link #LockMonitorThread(JobExecutor)} */
public LockMonitorThread(JbpmConfiguration jbpmConfiguration, int lockMonitorInterval,
@@ -48,52 +46,36 @@
}
public void run() {
- try {
- while (isActive) {
+ while (isActive) {
+ try {
+ unlockOverdueJobs();
+ jbpmConfiguration.getJobExecutor().ensureThreadsAreActive();
+ }
+ catch (RuntimeException e) {
+ log.error("exception in " + getName(), e);
+ }
+ if (isActive) {
try {
- unlockOverdueJobs();
- jbpmConfiguration.getJobExecutor().ensureThreadsAreActive();
- if (isActive && lockMonitorInterval > 0) {
- sleep(lockMonitorInterval);
- }
+ sleep(lockMonitorInterval);
}
catch (InterruptedException e) {
- log.info("lock monitor thread '" + getName() + "' got interrupted");
+ log.info(getName() + " got interrupted");
}
- catch (Exception e) {
- log.error("exception in lock monitor thread. waiting "
- + lockMonitorInterval
- + " milliseconds", e);
- try {
- sleep(lockMonitorInterval);
- }
- catch (InterruptedException e2) {
- log.debug("delay after exception got interrupted", e2);
- }
- }
}
}
- catch (Exception e) {
- log.error("exception in lock monitor thread", e);
- }
- finally {
- log.info(getName() + " leaves cyberspace");
- }
+ log.info(getName() + " leaves cyberspace");
}
protected void unlockOverdueJobs() {
- List overdueJobs = null;
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
Date threshold = new Date(System.currentTimeMillis() - maxLockTime - lockBufferTime);
- JobSession jobSession = jbpmContext.getJobSession();
- overdueJobs = jobSession.findJobsWithOverdueLockTime(threshold);
+ List overdueJobs = jbpmContext.getJobSession().findJobsWithOverdueLockTime(threshold);
for (Iterator i = overdueJobs.iterator(); i.hasNext();) {
Job job = (Job) i.next();
- if(log.isDebugEnabled())
- {
- log.debug("unlocking " + job + " owned by thread " + job.getLockOwner());
- }
+ if (log.isDebugEnabled()) {
+ log.debug("unlocking " + job + " owned by " + job.getLockOwner());
+ }
job.setLockOwner(null);
job.setLockTime(null);
}
@@ -102,6 +84,10 @@
jbpmContext.setRollbackOnly();
throw e;
}
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
finally {
try {
jbpmContext.close();
@@ -109,8 +95,8 @@
catch (RuntimeException e) {
if (!DbPersistenceService.isLockingException(e)) throw e;
// if this is a locking exception, keep it quiet
- StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error(
- "could not unlock overdue jobs: " + overdueJobs);
+ StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
+ .error("could not unlock overdue jobs");
}
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java 2009-07-04 11:53:02 UTC (rev 5222)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java 2009-07-04 13:41:56 UTC (rev 5223)
@@ -1,14 +1,10 @@
package org.jbpm.jbpm2375;
-import org.jbpm.JbpmConfiguration;
import org.jbpm.db.AbstractDbTestCase;
import org.jbpm.graph.def.ActionHandler;
-import org.jbpm.graph.def.Event;
-import org.jbpm.graph.def.EventCallback;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.ExecutionContext;
import org.jbpm.graph.exe.ProcessInstance;
-import org.jbpm.job.executor.JobExecutor;
/**
* Test if the JobExecutorThread recovers from an Error
@@ -20,102 +16,83 @@
public class JBPM2375Test extends AbstractDbTestCase {
private static final int TEST_TIMEOUT = 10 * 1000;
+ private static boolean throwError;
- private JobExecutor jobExecutor = new JobExecutor();
- private long processDefinitionId;
-
+ private ProcessDefinition processDefinition;
+
// a process definition with two timers moving the token forward
// the second state has an action associated with the node-enter event,
// which can simulate an Error condition by throwing a NoClassDefFoundError
- private static final String PROCESS_DEFINITION = "<process-definition name='jbpm2375-timer-error-test'>"
- + " <event type='process-end'>"
- + " <action expression='#{eventCallback.processEnd}' />"
- + " </event>"
- + " <start-state name='start'>"
- + " <transition to='state1' name='to_state1'/>"
- + " </start-state>"
- + " <state name='state1'>"
- + " <timer name='moveToDefaultEndAfter1second' duedate='1 second' transition='to_state2'/>"
- + " <transition to='state2' name='to_state2'/>"
- + " </state>"
- + " <state name='state2'>"
- + " <timer name='moveToEndAfter1second' duedate='1 second' transition='to_end'/>"
- + " <event type='node-enter'>"
- + " <action name='exceptionTest' class='"
- + TimerExceptionAction.class.getName()
- + "'>"
- + " </action>"
- + " </event>"
- + " <transition to='end' name='to_end'/>"
- + " </state>"
- + " <end-state name='end' />"
- + "</process-definition>";
-
-
+ private static final String PROCESS_DEFINITION = "<process-definition name='jbpm2375-test'>" +
+ " <start-state name='start'>" +
+ " <transition to='state1' name='to_state1'/>" +
+ " </start-state>" +
+ " <state name='state1'>" +
+ " <timer name='moveToDefaultEndAfter1second' duedate='1 second' transition='to_state2'/>" +
+ " <transition to='state2' name='to_state2'/>" +
+ " </state>" +
+ " <state name='state2'>" +
+ " <timer name='moveToEndAfter1second' duedate='1 second' transition='to_end'/>" +
+ " <event type='node-enter'>" +
+ " <action name='exceptionTest' class='" +
+ TimerExceptionAction.class.getName() +
+ "'>" +
+ " </action>" +
+ " </event>" +
+ " <transition to='end' name='to_end'/>" +
+ " </state>" +
+ " <end-state name='end' />" +
+ "</process-definition>";
+
protected void setUp() throws Exception {
super.setUp();
+ jbpmConfiguration.getJobExecutor().setLockMonitorInterval(TEST_TIMEOUT / 2);
- ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(PROCESS_DEFINITION);
+ processDefinition = ProcessDefinition.parseXmlString(PROCESS_DEFINITION);
jbpmContext.deployProcessDefinition(processDefinition);
- newTransaction();
- processDefinitionId = processDefinition.getId();
- getJbpmConfiguration().getJobExecutor().setLockMonitorInterval(TEST_TIMEOUT/2);
- startJobExecutor();
-
-
+ newTransaction();
}
protected void tearDown() throws Exception {
- stopJobExecutor();
+ graphSession.deleteProcessDefinition(processDefinition.getId());
- graphSession.deleteProcessDefinition(processDefinitionId);
-
- EventCallback.clear();
+ jbpmConfiguration.getJobExecutor().setLockMonitorInterval(60000);
super.tearDown();
- }
+ }
// check if the process ends correctly if no Error is thrown
public void testTimerWithoutErrorAction() {
- runTimerErrorAction(Boolean.FALSE);
+ throwError = false;
+ runTimerErrorAction();
}
// check if the process ends correctly if an Error is thrown in the ActionHandler
public void testTimerWithErrorAction() {
- runTimerErrorAction(Boolean.TRUE);
+ throwError = true;
+ runTimerErrorAction();
}
- private void runTimerErrorAction(Boolean withError) {
-
+ private void runTimerErrorAction() {
// kick off process instance
- ProcessDefinition processDefinition = graphSession.loadProcessDefinition(processDefinitionId);
ProcessInstance processInstance = new ProcessInstance(processDefinition);
- processInstance.getContextInstance().setVariable("eventCallback", new EventCallback());
- processInstance.getContextInstance().setVariable("throwError", withError);
processInstance.signal();
jbpmContext.save(processInstance);
- commitAndCloseSession();
- try {
- EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, TEST_TIMEOUT);
+ processJobs(TEST_TIMEOUT);
- waitForJobs(TEST_TIMEOUT);
-
- } finally {
- beginSessionTransaction();
- }
+ processInstance = jbpmContext.loadProcessInstance(processInstance.getId());
+ assert processInstance.hasEnded() : processInstance;
+ }
- }
-
-
public static class TimerExceptionAction implements ActionHandler {
-
private static final long serialVersionUID = 1L;
public void execute(ExecutionContext executionContext) throws Exception {
- Boolean throwError = (Boolean)executionContext.getVariable("throwError");
- if (throwError.booleanValue())
- throw new NoClassDefFoundError("org.jbpm.no.class.Class");
+ if (throwError) {
+ throwError = false;
+ throw new NoClassDefFoundError("org.jbpm.no.such.Class");
+ }
}
}
}
More information about the jbpm-commits
mailing list