[jbpm-commits] JBoss JBPM SVN: r6920 - in jbpm3/branches/3.2.10.SP/core/src/main: java/org/jbpm/job/executor and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri May 13 11:09:03 EDT 2011
Author: jcoleman at redhat.com
Date: 2011-05-13 11:09:03 -0400 (Fri, 13 May 2011)
New Revision: 6920
Modified:
jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java
jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
Log:
Pull up revisions 6918, 6919 from jbpm-3.2-soa branch:
SOA-3007.
Fix problems with the split of the old JobExecutorThread(s) into the new
DispatcherThread + JobExecutorThread(s):
o the DispatcherThread busy-waits in a loop (and uses CPU)
o job processing could be deferred for an indeterminate time.
o crashing at certain points could leave jobs locked (not processed)
indefinitely
Fix by kconner at .
Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java 2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java 2011-05-13 15:09:03 UTC (rev 6920)
@@ -275,4 +275,14 @@
}
return session.createCriteria(Job.class).add(Restrictions.in("id", jobs)).list();
}
+
+ public void releaseLockedJobs(final String lockOwner) {
+ try {
+ session.getNamedQuery("JobSession.releaseLockedJobs")
+ .setString("lockOwner", lockOwner)
+ .executeUpdate();
+ } catch (HibernateException e) {
+ throw new JbpmPersistenceException("could not release locked jobs by owner '" + lockOwner + "'", e);
+ }
+ }
}
Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java 2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java 2011-05-13 15:09:03 UTC (rev 6920)
@@ -52,40 +52,28 @@
}
public void run() {
- int retryInterval = jobExecutor.getRetryInterval();
while (active) {
- // acquire job; on exception, call returns null
- Job job = acquireJob();
- // submit job
- if (job != null) submitJob(job);
+ if (jobExecutor.waitForFreeExecutorThread()) {
+ // acquire job; on exception, call returns null
+ Job job = acquireJob();
+ // submit job
+ if (job != null) {
+ submitJob(job);
+ continue ;
+ }
+ }
// if still active, wait or sleep
if (active) {
try {
- if (job != null) {
- // reset the current retry interval
- retryInterval = jobExecutor.getRetryInterval();
- // wait for next due job
- long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
- if (waitPeriod > 0) {
- synchronized (jobExecutor) {
+ // wait for next due job
+ long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
+ if (waitPeriod > 0) {
+ synchronized (jobExecutor) {
+ if (active)
jobExecutor.wait(waitPeriod);
- }
}
}
- else {
- // sleep instead of waiting on jobExecutor
- // to prevent message/scheduler service from waking up this thread
- sleep(retryInterval);
- // after an exception, double the current retry interval
- // to avoid continuous failures during anomalous conditions
- retryInterval *= 2;
- // enforce maximum idle interval
- int maxIdleInterval = jobExecutor.getMaxIdleInterval();
- if (retryInterval > maxIdleInterval || retryInterval < 0) {
- retryInterval = maxIdleInterval;
- }
- }
}
catch (InterruptedException e) {
if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
@@ -144,12 +132,8 @@
}
private void submitJob(Job job) {
- try {
- jobExecutor.getQueue().put(job);
- }
- catch (InterruptedException e) {
+ if (!jobExecutor.submitJob(job)) {
unlockJob(job);
- if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
}
}
Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2011-05-13 15:09:03 UTC (rev 6920)
@@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -16,9 +17,13 @@
import org.apache.commons.logging.LogFactory;
import org.jbpm.JbpmConfiguration;
+import org.jbpm.JbpmContext;
+import org.jbpm.db.JobSession;
+import org.jbpm.job.Job;
-import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Condition;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
public class JobExecutor implements Serializable {
@@ -39,7 +44,13 @@
protected int lockBufferTime;
private ThreadGroup threadGroup;
- private BlockingQueue queue = new SynchronousQueue();
+ private int waitingExecutorCount ;
+ private boolean waitingDispatcher ;
+ private boolean dispatcherActive ;
+ private Lock waitingExecutorLock = new ReentrantLock() ;
+ private Condition waitingExecutorCondition = waitingExecutorLock.newCondition() ;
+ private Condition waitingDispatcherCondition = waitingExecutorLock.newCondition() ;
+ private LinkedList dispatchedJobs = new LinkedList();
/** @deprecated call {@link #getThreads()} instead */
protected Map threads;
@@ -56,6 +67,8 @@
if (!isStarted) {
log.info("starting " + name);
+ activateDispatcher() ;
+
// create thread group
threadGroup = new ThreadGroup(name) {
public void uncaughtException(Thread thread, Throwable throwable) {
@@ -119,6 +132,8 @@
}
}
+ deactivateDispatcher() ;
+
// return deactivated threads
return deactivatedThreads;
}
@@ -158,10 +173,6 @@
return threadGroup;
}
- BlockingQueue getQueue() {
- return queue;
- }
-
private String getThreadName(int index) {
return name + '@' + getHostAddress() + ":Executor-" + index;
}
@@ -418,5 +429,116 @@
this.nbrOfThreads = nbrOfThreads;
}
+ private boolean hasFreeExecutor() {
+ waitingExecutorLock.lock() ;
+ try {
+ return (waitingExecutorCount > dispatchedJobs.size()) ;
+ } finally {
+ waitingExecutorLock.unlock() ;
+ }
+ }
+ // return false when interrupted
+ boolean waitForFreeExecutorThread() {
+ waitingExecutorLock.lock() ;
+ try {
+ waitingDispatcher = true ;
+ if (dispatcherActive) {
+ if (hasFreeExecutor()) {
+ return true ;
+ } else {
+ waitingDispatcherCondition.await() ;
+ return hasFreeExecutor() ;
+ }
+ }
+ } catch (final InterruptedException ie) {
+ } finally {
+ waitingDispatcher = false ;
+ waitingExecutorLock.unlock() ;
+ }
+ return false ;
+ }
+
+ // return null when interrupted
+ Job getJob() {
+ waitingExecutorLock.lock() ;
+ try {
+ waitingExecutorCount++ ;
+ if (dispatcherActive) {
+ if (waitingDispatcher && hasFreeExecutor()) {
+ waitingDispatcherCondition.signal() ;
+ }
+ if (dispatchedJobs.isEmpty()) {
+ waitingExecutorCondition.await() ;
+ }
+ if (dispatchedJobs.size() > 0) {
+ return (Job)dispatchedJobs.remove() ;
+ }
+ }
+ } catch (final InterruptedException ie) {
+ } finally {
+ waitingExecutorCount-- ;
+ waitingExecutorLock.unlock() ;
+ }
+ return null ;
+ }
+
+ boolean submitJob(final Job job) {
+ waitingExecutorLock.lock() ;
+ try {
+ if (hasFreeExecutor()) {
+ dispatchedJobs.add(job) ;
+ waitingExecutorCondition.signal() ;
+ return true ;
+ }
+ } finally {
+ waitingExecutorLock.unlock() ;
+ }
+ return false ;
+ }
+
+ private void activateDispatcher() {
+ waitingExecutorLock.lock() ;
+ try {
+ if (!dispatcherActive) {
+ unlockOurJobs() ;
+ dispatcherActive = true ;
+ }
+ } finally {
+ waitingExecutorLock.unlock() ;
+ }
+ }
+
+ private void unlockOurJobs() {
+ final JbpmContext jbpmContext = getJbpmConfiguration().createJbpmContext();
+ try {
+ final String lockOwner = getName();
+ final JobSession jobSession = jbpmContext.getJobSession();
+ jobSession.releaseLockedJobs(lockOwner);
+ } catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ if (log.isDebugEnabled()) log.debug("failed to release locked jobs", e);
+ } catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ } finally {
+ try {
+ jbpmContext.close();
+ } catch (RuntimeException e) {
+ if (log.isDebugEnabled()) log.debug("failed to release locked jobs", e);
+ }
+ }
+ }
+
+ private void deactivateDispatcher() {
+ waitingExecutorLock.lock() ;
+ try {
+ dispatcherActive = false ;
+ waitingDispatcherCondition.signal() ;
+ waitingExecutorCondition.signalAll() ;
+ } finally {
+ waitingExecutorLock.unlock() ;
+ }
+ }
+
private static Log log = LogFactory.getLog(JobExecutor.class);
}
Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2011-05-13 15:09:03 UTC (rev 6920)
@@ -7,6 +7,7 @@
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -23,6 +24,7 @@
private final JobExecutor jobExecutor;
private volatile boolean active = true;
+ private Random random = new Random() ;
public JobExecutorThread(String name, JobExecutor jobExecutor) {
super(jobExecutor.getThreadGroup(), name);
@@ -42,7 +44,7 @@
public void run() {
while (active) {
// take on next job
- Job job = acquireJob();
+ Job job = jobExecutor.getJob();
// if an exception occurs, acquireJob() returns null
if (job != null) {
try {
@@ -126,16 +128,6 @@
return jobs;
}
- private Job acquireJob() {
- try {
- return (Job) jobExecutor.getQueue().take();
- }
- catch (InterruptedException e) {
- if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
- return null;
- }
- }
-
protected void executeJob(Job job) throws Exception {
JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
try {
@@ -192,10 +184,9 @@
// unlock job so it can be dispatched again
job.setLockOwner(null);
job.setLockTime(null);
- // notify job executor
- synchronized (jobExecutor) {
- jobExecutor.notify();
- }
+ int waitPeriod = jobExecutor.getRetryInterval() / 2;
+ waitPeriod += random.nextInt(waitPeriod) ;
+ job.setDueDate(new Date(System.currentTimeMillis() + waitPeriod)) ;
}
catch (RuntimeException e) {
jbpmContext.setRollbackOnly();
@@ -213,6 +204,10 @@
log.warn("failed to save exception for " + job, e);
}
}
+ // notify job executor
+ synchronized (jobExecutor) {
+ jobExecutor.notify();
+ }
}
private void unlockJob(Job job) {
@@ -224,10 +219,6 @@
// unlock job
job.setLockOwner(null);
job.setLockTime(null);
- // notify job executor
- synchronized (jobExecutor) {
- jobExecutor.notify();
- }
}
catch (RuntimeException e) {
jbpmContext.setRollbackOnly();
@@ -246,6 +237,10 @@
log.warn("failed to unlock " + job, e);
}
}
+ // notify job executor
+ synchronized (jobExecutor) {
+ jobExecutor.notify();
+ }
}
/** @deprecated responsibility moved to DispatcherThread */
Modified: jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2011-05-13 15:09:03 UTC (rev 6920)
@@ -401,6 +401,16 @@
]]>
</query>
+ <query name="JobSession.releaseLockedJobs">
+ <![CDATA[
+ update org.jbpm.job.Job job
+ set job.lockOwner = null, job.lockTime = null
+ where (job.lockOwner = :lockOwner)
+ and job.retries > 0
+ and job.isSuspended = false
+ ]]>
+ </query>
+
<!-- related to Tasks -->
<!-- ########################### -->
More information about the jbpm-commits
mailing list