[jbpm-commits] JBoss JBPM SVN: r5208 - in jbpm3/branches/jbpm-3.2-soa/modules/core/src: test/java/org/jbpm and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jul 3 05:14:48 EDT 2009
Author: mputz
Date: 2009-07-03 05:14:48 -0400 (Fri, 03 Jul 2009)
New Revision: 5208
Added:
jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/
jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
Modified:
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
Log:
Fix for JBPM-2375 - JobExecutorThread is terminated after an Error: LockMonitorThread restarts terminated JobExecutorThreads.
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2009-07-03 09:03:10 UTC (rev 5207)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2009-07-03 09:14:48 UTC (rev 5208)
@@ -48,7 +48,8 @@
for (int i = 0; i < nbrOfThreads; i++) {
startThread();
}
- lockMonitorThread = new LockMonitorThread(jbpmConfiguration, lockMonitorInterval, maxLockTime, lockBufferTime);
+ lockMonitorThread = new LockMonitorThread(getLockMonitorThreadName(), this);
+ lockMonitorThread.start();
isStarted = true;
}
else {
@@ -92,9 +93,30 @@
if (lockMonitorThread != null) lockMonitorThread.join();
}
+
+ public void ensureThreadsAreActive() {
+ List deadThreads = new ArrayList();
+ for (Iterator i = threads.values().iterator(); i.hasNext();) {
+ Thread thread = (Thread) i.next();
+ if (!thread.isAlive()) {
+ if(log.isDebugEnabled())
+ {
+ log.debug("detected dead thread '" + thread.getName() + "'");
+ }
+ deadThreads.add(thread.getName());
+ i.remove();
+ }
+ }
+ for (int i = 0; i<deadThreads.size(); i++) {
+ startThread((String)deadThreads.get(i));
+ }
+ }
+
+ protected void startThread() {
+ startThread(getNextThreadName());
+ }
- protected synchronized void startThread() {
- String threadName = getNextThreadName();
+ protected synchronized void startThread(String threadName) {
Thread thread = createThread(threadName);
threads.put(threadName, thread);
@@ -117,6 +139,10 @@
private String getThreadName(int index) {
return name + ":" + getHostAddress() + ":" + index;
}
+
+ private String getLockMonitorThreadName() {
+ return name + ':' + LockMonitorThread.DEFAULT_NAME + '@' + getHostAddress();
+ }
private static String getHostAddress() {
if (hostName == null) {
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java 2009-07-03 09:03:10 UTC (rev 5207)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java 2009-07-03 09:14:48 UTC (rev 5208)
@@ -16,57 +16,69 @@
public class LockMonitorThread extends Thread {
- final JbpmConfiguration jbpmConfiguration;
- final int lockMonitorInterval;
- final int maxLockMonitorInterval;
- final int maxLockTime;
- final int lockBufferTime;
+ public static final String DEFAULT_NAME = "LockMonitorThread";
+ JbpmConfiguration jbpmConfiguration;
+ int lockMonitorInterval;
+ int maxLockTime;
+ int lockBufferTime;
+
int currentLockMonitorInterval;
volatile boolean isActive = true;
+
+ public LockMonitorThread(JobExecutor jobExecutor) {
+ this(DEFAULT_NAME, jobExecutor);
+ }
+ public LockMonitorThread(String name, JobExecutor jobExecutor) {
+ super(name);
+ jbpmConfiguration = jobExecutor.getJbpmConfiguration();
+ lockMonitorInterval = jobExecutor.getLockMonitorInterval();
+ maxLockTime = jobExecutor.getMaxLockTime();
+ lockBufferTime = jobExecutor.getLockBufferTime();
+ }
+
+ /** @deprecated As of jBPM 3.2.6, replaced by {@link #LockMonitorThread(JobExecutor)} */
public LockMonitorThread(JbpmConfiguration jbpmConfiguration, int lockMonitorInterval,
int maxLockTime, int lockBufferTime) {
this.jbpmConfiguration = jbpmConfiguration;
this.lockMonitorInterval = lockMonitorInterval;
- this.maxLockMonitorInterval = jbpmConfiguration.getJobExecutor().getMaxIdleInterval();
this.maxLockTime = maxLockTime;
this.lockBufferTime = lockBufferTime;
}
public void run() {
- currentLockMonitorInterval = lockMonitorInterval;
- while (isActive) {
- try {
- unlockOverdueJobs();
- if (isActive) {
- sleep(currentLockMonitorInterval);
- }
- // no exception, reset current lock monitor interval
- currentLockMonitorInterval = lockMonitorInterval;
- }
- catch (RuntimeException e) {
- log.error("exception in lock monitor thread. waiting " +
- currentLockMonitorInterval +
- " milliseconds", e);
+ try {
+ while (isActive) {
try {
- sleep(currentLockMonitorInterval);
+ unlockOverdueJobs();
+ jbpmConfiguration.getJobExecutor().ensureThreadsAreActive();
+ if (isActive && lockMonitorInterval > 0) {
+ sleep(lockMonitorInterval);
+ }
}
- catch (InterruptedException ie) {
- log.debug("delay after exception got interrupted", ie);
+ catch (InterruptedException e) {
+ log.info("lock monitor thread '" + getName() + "' got interrupted");
}
- // after an exception, double the current lock monitor interval to prevent
- // continuous exception generation when e.g. the db is unreachable
- currentLockMonitorInterval <<= 1;
- if (currentLockMonitorInterval > maxLockMonitorInterval || currentLockMonitorInterval < 0) {
- currentLockMonitorInterval = maxLockMonitorInterval;
+ catch (Exception e) {
+ log.error("exception in lock monitor thread. waiting "
+ + lockMonitorInterval
+ + " milliseconds", e);
+ try {
+ sleep(lockMonitorInterval);
+ }
+ catch (InterruptedException e2) {
+ log.debug("delay after exception got interrupted", e2);
+ }
}
}
- catch (InterruptedException e) {
- log.info("lock monitor thread '" + getName() + "' got interrupted");
- }
}
- log.info(getName() + " leaves cyberspace");
+ catch (Exception e) {
+ log.error("exception in lock monitor thread", e);
+ }
+ finally {
+ log.info(getName() + " leaves cyberspace");
+ }
}
protected void unlockOverdueJobs() {
@@ -78,7 +90,10 @@
overdueJobs = jobSession.findJobsWithOverdueLockTime(threshold);
for (Iterator i = overdueJobs.iterator(); i.hasNext();) {
Job job = (Job) i.next();
- log.debug("unlocking " + job + " owned by thread " + job.getLockOwner());
+ if(log.isDebugEnabled())
+ {
+ log.debug("unlocking " + job + " owned by thread " + job.getLockOwner());
+ }
job.setLockOwner(null);
job.setLockTime(null);
}
Added: jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java 2009-07-03 09:14:48 UTC (rev 5208)
@@ -0,0 +1,121 @@
+package org.jbpm.jbpm2375;
+
+import org.jbpm.JbpmConfiguration;
+import org.jbpm.db.AbstractDbTestCase;
+import org.jbpm.graph.def.ActionHandler;
+import org.jbpm.graph.def.Event;
+import org.jbpm.graph.def.EventCallback;
+import org.jbpm.graph.def.ProcessDefinition;
+import org.jbpm.graph.exe.ExecutionContext;
+import org.jbpm.graph.exe.ProcessInstance;
+import org.jbpm.job.executor.JobExecutor;
+
+/**
+ * Test if the JobExecutorThread recovers from an Error
+ *
+ * @see <a href="https://jira.jboss.org/jira/browse/JBPM-2357">JBPM-2357</a>
+ * @author mputz at redhat.com
+ * @since 30-Jun-2009
+ */
+public class JBPM2375Test extends AbstractDbTestCase {
+
+ private static final int TEST_TIMEOUT = 10 * 1000;
+
+ private JobExecutor jobExecutor = new JobExecutor();
+ private long processDefinitionId;
+
+ // a process definition with two timers moving the token forward
+ // the second state has an action associated with the node-enter event,
+ // which can simulate an Error condition by throwing a NoClassDefFoundError
+ private static final String PROCESS_DEFINITION = "<process-definition name='jbpm2375-timer-error-test'>"
+ + " <event type='process-end'>"
+ + " <action expression='#{eventCallback.processEnd}' />"
+ + " </event>"
+ + " <start-state name='start'>"
+ + " <transition to='state1' name='to_state1'/>"
+ + " </start-state>"
+ + " <state name='state1'>"
+ + " <timer name='moveToDefaultEndAfter1second' duedate='1 second' transition='to_state2'/>"
+ + " <transition to='state2' name='to_state2'/>"
+ + " </state>"
+ + " <state name='state2'>"
+ + " <timer name='moveToEndAfter1second' duedate='1 second' transition='to_end'/>"
+ + " <event type='node-enter'>"
+ + " <action name='exceptionTest' class='"
+ + TimerExceptionAction.class.getName()
+ + "'>"
+ + " </action>"
+ + " </event>"
+ + " <transition to='end' name='to_end'/>"
+ + " </state>"
+ + " <end-state name='end' />"
+ + "</process-definition>";
+
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(PROCESS_DEFINITION);
+ jbpmContext.deployProcessDefinition(processDefinition);
+ newTransaction();
+ processDefinitionId = processDefinition.getId();
+
+ getJbpmConfiguration().getJobExecutor().setLockMonitorInterval(TEST_TIMEOUT/2);
+ startJobExecutor();
+
+
+ }
+
+ protected void tearDown() throws Exception {
+ stopJobExecutor();
+
+ graphSession.deleteProcessDefinition(processDefinitionId);
+
+ EventCallback.clear();
+ super.tearDown();
+ }
+
+ // check if the process ends correctly if no Error is thrown
+ public void testTimerWithoutErrorAction() {
+ runTimerErrorAction(Boolean.FALSE);
+ }
+
+ // check if the process ends correctly if an Error is thrown in the ActionHandler
+ public void testTimerWithErrorAction() {
+ runTimerErrorAction(Boolean.TRUE);
+ }
+
+ private void runTimerErrorAction(Boolean withError) {
+
+ // kick off process instance
+ ProcessDefinition processDefinition = graphSession.loadProcessDefinition(processDefinitionId);
+ ProcessInstance processInstance = new ProcessInstance(processDefinition);
+ processInstance.getContextInstance().setVariable("eventCallback", new EventCallback());
+ processInstance.getContextInstance().setVariable("throwError", withError);
+ processInstance.signal();
+ jbpmContext.save(processInstance);
+
+ commitAndCloseSession();
+ try {
+ EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, TEST_TIMEOUT);
+
+ waitForJobs(TEST_TIMEOUT);
+
+ } finally {
+ beginSessionTransaction();
+ }
+
+ }
+
+
+ public static class TimerExceptionAction implements ActionHandler {
+
+ private static final long serialVersionUID = 1L;
+
+ public void execute(ExecutionContext executionContext) throws Exception {
+ Boolean throwError = (Boolean)executionContext.getVariable("throwError");
+ if (throwError.booleanValue())
+ throw new NoClassDefFoundError("org.jbpm.no.class.Class");
+ }
+ }
+}
More information about the jbpm-commits
mailing list