[jbpm-commits] JBoss JBPM SVN: r6810 - in jbpm3/branches/jbpm-3.2-soa/core/src/main: java/org/jbpm/job/executor and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Nov 10 01:38:43 EST 2010
Author: alex.guizar at jboss.com
Date: 2010-11-10 01:38:42 -0500 (Wed, 10 Nov 2010)
New Revision: 6810
Added:
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java
Removed:
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
Modified:
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
Log:
JBPM-2959 backport dispatcher thread from jBPM 4;
employ BlockingQueue instead of ThreadPoolExecutor to not abandon JobExecutorThread
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -41,7 +41,7 @@
public class JobSession {
- final Session session;
+ private final Session session;
public JobSession(Session session) {
this.session = session;
@@ -50,14 +50,13 @@
public Job getFirstAcquirableJob(String lockOwner) {
try {
Query query;
- if (lockOwner != null) {
+ if (lockOwner == null) {
+ query = session.getNamedQuery("JobSession.getFirstUnownedAcquirableJob");
+ }
+ else {
query = session.getNamedQuery("JobSession.getFirstAcquirableJob")
.setString("lockOwner", lockOwner);
}
- else {
- query = session.getNamedQuery("JobSession.getFirstAcquirableJobExcludingOwned");
- }
-
return (Job) query.setTimestamp("now", new Date()).setMaxResults(1).uniqueResult();
}
catch (HibernateException e) {
@@ -94,7 +93,7 @@
try {
Query query;
if (lockOwner == null) {
- query = session.getNamedQuery("JobSession.getFirstDueJobExcludingOwned");
+ query = session.getNamedQuery("JobSession.getFirstUnownedDueJob");
}
else if (monitoredJobs == null || monitoredJobs.isEmpty()) {
query = session.getNamedQuery("JobSession.getFirstDueJob")
@@ -105,7 +104,6 @@
.setString("lockOwner", lockOwner)
.setParameterList("monitoredJobIds", monitoredJobs);
}
-
return (Job) query.setMaxResults(1).uniqueResult();
}
catch (HibernateException e) {
Added: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.job.executor;
+
+/**
+ * A thread that can be deactivated.
+ *
+ * @author Alejandro Guizar
+ */
+interface Deactivable {
+
+ /**
+ * Tells this thread to stop running. Execution will cease shortly afterwards.
+ */
+ void deactivate();
+}
Property changes on: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -35,13 +35,13 @@
*
* @author Alejandro Guizar
*/
-class DispatcherThread extends Thread {
+class DispatcherThread extends Thread implements Deactivable {
+ public static final String DEFAULT_NAME = "Dispatcher";
+
private static final Log log = LogFactory.getLog(DispatcherThread.class);
- public static final String DEFAULT_NAME = "Dispatcher";
-
- private JobExecutor jobExecutor;
+ private final JobExecutor jobExecutor;
private volatile boolean active = true;
DispatcherThread(JobExecutor jobExecutor) {
@@ -49,7 +49,7 @@
}
DispatcherThread(String name, JobExecutor jobExecutor) {
- super(name);
+ super(jobExecutor.getThreadGroup(), name);
this.jobExecutor = jobExecutor;
}
@@ -61,7 +61,7 @@
// submit job
if (job != null) submitJob(job);
- // if still active wait for next job
+ // if still active, wait or sleep
if (active) {
try {
if (job != null) {
@@ -77,7 +77,7 @@
}
else {
// sleep instead of waiting on jobExecutor
- // to prevent DbMessageService from waking up this thread
+ // to prevent message/scheduler service from waking up this thread
sleep(retryInterval);
// after an exception, double the current retry interval
// to avoid continuous failures during anomalous conditions
@@ -145,11 +145,45 @@
}
private void submitJob(Job job) {
- JobParcel jobParcel = new JobParcel(jobExecutor, job);
- jobExecutor.getExecutorService().execute(jobParcel);
- if (log.isDebugEnabled()) log.debug("submitted " + job);
+ boolean debug = log.isDebugEnabled();
+ try {
+ jobExecutor.getQueue().put(job);
+ if (debug) log.debug("submitted " + job);
+ }
+ catch (InterruptedException e) {
+ unlockJob(job);
+ if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
+ }
}
+ private void unlockJob(Job job) {
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ // reattach job to persistence context
+ jbpmContext.getJobSession().reattachJob(job);
+
+ // unlock job so it can be dispatched again
+ job.setLockOwner(null);
+ job.setLockTime(null);
+ }
+ catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ log.warn("failed to unlock " + job, e);
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ finally {
+ try {
+ jbpmContext.close();
+ }
+ catch (RuntimeException e) {
+ log.warn("failed to unlock " + job, e);
+ }
+ }
+ }
+
private long getWaitPeriod(int currentIdleInterval) {
Date nextDueDate = getNextDueDate();
if (nextDueDate != null) {
@@ -193,9 +227,6 @@
return nextDueDate;
}
- /**
- * Indicates that this thread should stop running. Execution will cease shortly afterwards.
- */
public void deactivate() {
if (active) {
active = false;
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -3,27 +3,22 @@
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Hashtable;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jbpm.JbpmConfiguration;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
-import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
-import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class JobExecutor implements Serializable {
@@ -43,13 +38,12 @@
/** @deprecated property has no effect */
protected int lockBufferTime;
- ThreadGroup threadGroup;
- private ExecutorService executorService;
+ private ThreadGroup threadGroup;
+ private BlockingQueue queue = new SynchronousQueue();
/** @deprecated call {@link #getThreads()} instead */
protected Map threads;
-
- private DispatcherThread dispatcherThread;
+ /** @deprecated call {@link #getThreads()} instead */
protected LockMonitorThread lockMonitorThread;
protected Map monitoredJobIds = new Hashtable();
@@ -65,7 +59,10 @@
// create thread group
threadGroup = new ThreadGroup(name) {
public void uncaughtException(Thread thread, Throwable throwable) {
- if (thread instanceof DispatcherThread) {
+ if (thread instanceof JobExecutorThread) {
+ startThread(thread.getName());
+ }
+ else if (thread instanceof DispatcherThread) {
startDispatcherThread();
}
else if (thread instanceof LockMonitorThread) {
@@ -75,50 +72,21 @@
}
};
- // create thread factory
- ThreadFactory threadFactory = new ThreadFactory() {
- public Thread newThread(Runnable task) {
- Thread thread = new Thread(threadGroup, task, getNextThreadName());
- if (thread.isDaemon()) thread.setDaemon(false);
- if (thread.getPriority() != Thread.NORM_PRIORITY)
- thread.setPriority(Thread.NORM_PRIORITY);
- return thread;
- }
- };
+ // start executor threads
+ for (int i = 1; i <= nbrOfThreads; i++) {
+ startThread(getThreadName(i));
+ }
- // start executor service
- executorService = new ThreadPoolExecutor(nbrOfThreads,
- nbrOfThreads,
- 0L,
- TimeUnit.MILLISECONDS,
- new SynchronousQueue(),
- threadFactory,
- JobRejectionHandler.INSTANCE);
-
- // start helper threads
+ // start control threads
startDispatcherThread();
startLockMonitorThread();
isStarted = true;
}
else if (log.isDebugEnabled()) {
- log.debug("ignoring start: " + name + " already started'");
+ log.debug("ignoring start: " + name + " already started");
}
}
- private static class JobRejectionHandler implements RejectedExecutionHandler {
-
- static final JobRejectionHandler INSTANCE = new JobRejectionHandler();
-
- public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
- try {
- executor.getQueue().put(task);
- }
- catch (InterruptedException e) {
- throw new RejectedExecutionException("queuing " + task + " got interrupted", e);
- }
- }
- }
-
/**
* tells all threads in this job executor to stop. Threads may be in the middle of processing
* a job and they will finish that first. Use {@link #stopAndJoin()} in case you want a method
@@ -133,53 +101,55 @@
return Collections.EMPTY_LIST;
}
- // list active threads
- Thread[] activeThreads = new Thread[threadGroup.activeCount()];
- threadGroup.enumerate(activeThreads);
+ log.info("stopping " + name);
+ isStarted = false;
- // tell threads to stop
- stopThreads();
+ // fetch active threads
+ Thread[] activeThreads = new Thread[nbrOfThreads + 2];
+ int activeCount = threadGroup.enumerate(activeThreads, false);
- // return formerly active threads
- return Arrays.asList(activeThreads);
- }
-
- public void stopAndJoin() throws InterruptedException {
- if (!isStarted) {
- if (log.isDebugEnabled()) log.debug("ignoring stop, " + name + " not started");
- return;
+ // deactivate threads
+ List deactivatedThreads = new ArrayList(activeCount);
+ for (int i = 0; i < activeCount; i++) {
+ Thread thread = activeThreads[i];
+ if (thread instanceof Deactivable) {
+ Deactivable deactivable = (Deactivable) thread;
+ deactivable.deactivate();
+ deactivatedThreads.add(thread);
+ }
}
- // tell threads to stop
- stopThreads();
-
- // wait for executor to terminate
- executorService.awaitTermination(1, TimeUnit.MINUTES);
-
- // join helper threads
- dispatcherThread.join();
- lockMonitorThread.join();
+ // return deactivated threads
+ return deactivatedThreads;
}
- private void stopThreads() {
- log.info("stopping " + name);
- isStarted = false;
+ public void stopAndJoin() throws InterruptedException {
+ // deactivate threads
+ List threads = stop();
- // shut down execution service
- executorService.shutdown();
-
- // deactivate helper threads
- dispatcherThread.deactivate();
- lockMonitorThread.deactivate();
+ // join deactivated threads
+ for (Iterator i = threads.iterator(); i.hasNext();) {
+ Thread thread = (Thread) i.next();
+ thread.join();
+ }
}
public void ensureThreadsAreActive() {
- // executor service looks after its own threads
- // just confirm dispatcher and lock monitor are alive
- if (!dispatcherThread.isAlive()) {
+ Map threads = getThreads();
+
+ // check executor threads
+ for (int i = 1; i <= nbrOfThreads; i++) {
+ String threadName = getThreadName(i);
+ if (!threads.containsKey(threadName)) {
+ startThread(threadName);
+ }
+ }
+
+ // check control threads
+ if (!threads.containsKey(getDispatcherThreadName())) {
startDispatcherThread();
}
- if (!lockMonitorThread.isAlive()) {
+ if (!threads.containsKey(getLockMonitorThreadName())) {
startLockMonitorThread();
}
}
@@ -188,16 +158,19 @@
return threadGroup;
}
- ExecutorService getExecutorService() {
- return executorService;
+ BlockingQueue getQueue() {
+ return queue;
}
+ private String getThreadName(int index) {
+ return name + '@' + getHostAddress() + ":Executor-" + index;
+ }
+
/** @deprecated no longer invoked */
protected void startThread() {
startThread(getNextThreadName());
}
- /** @deprecated no longer invoked */
protected void startThread(String threadName) {
Thread thread = createThread(threadName);
@@ -205,56 +178,58 @@
thread.start();
}
- /** @deprecated no longer invoked */
protected Thread createThread(String threadName) {
return new JobExecutorThread(threadName, this);
}
+ /** @deprecated no longer invoked */
protected String getNextThreadName() {
return getThreadName(threadGroup.activeCount() + 1);
}
/** @deprecated no longer invoked */
protected String getLastThreadName() {
- return getThreadName(threadGroup.activeCount());
+ Map threads = getThreads();
+ for (int i = nbrOfThreads; i > 0; i--) {
+ String threadName = getThreadName(i);
+ if (threads.containsKey(threadName)) return threadName;
+ }
+ return null;
}
/** @deprecated no longer invoked */
protected synchronized Thread stopThread() {
- String threadName = getLastThreadName();
- if (log.isDebugEnabled()) log.debug("stopping " + threadName);
-
- Thread thread = (Thread) threads.remove(threadName);
- stopThread(thread);
- return thread;
- }
-
- private void stopThread(Thread thread) {
- if (thread instanceof JobExecutorThread) {
- JobExecutorThread executor = (JobExecutorThread) thread;
- executor.deactivate();
+ Map threads = getThreads();
+ for (int i = nbrOfThreads; i > 0; i--) {
+ String threadName = getThreadName(i);
+ JobExecutorThread executorThread = (JobExecutorThread) threads.get(threadName);
+ if (executorThread != null) {
+ executorThread.deactivate();
+ return executorThread;
+ }
}
- else if (thread instanceof LockMonitorThread) {
- LockMonitorThread monitor = (LockMonitorThread) thread;
- monitor.deactivate();
- }
+ return null;
}
- private String getThreadName(int index) {
- return name + '@' + getHostAddress() + ":Executor-" + index;
+ private String getDispatcherThreadName() {
+ return name + '@' + getHostAddress() + ':' + DispatcherThread.DEFAULT_NAME;
}
void startDispatcherThread() {
- String threadName = name + '@' + getHostAddress() + ':' + DispatcherThread.DEFAULT_NAME;
- dispatcherThread = new DispatcherThread(threadName, this);
+ String threadName = getDispatcherThreadName();
+ Thread dispatcherThread = new DispatcherThread(threadName, this);
if (log.isDebugEnabled()) log.debug("starting " + threadName);
dispatcherThread.start();
}
+ private String getLockMonitorThreadName() {
+ return name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
+ }
+
void startLockMonitorThread() {
- String threadName = name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
- lockMonitorThread = new LockMonitorThread(threadName, this);
+ String threadName = getLockMonitorThreadName();
+ Thread lockMonitorThread = new LockMonitorThread(threadName, this);
if (log.isDebugEnabled()) log.debug("starting " + threadName);
lockMonitorThread.start();
@@ -269,8 +244,8 @@
}
}
- public Set getMonitoredJobIds() {
- return new HashSet(monitoredJobIds.values());
+ public Collection getMonitoredJobIds() {
+ return monitoredJobIds.values();
}
public void addMonitoredJobId(String threadName, long jobId) {
@@ -379,12 +354,14 @@
}
public Map getThreads() {
- Thread[] threadList = new Thread[threadGroup.activeCount()];
- int threadCount = threadGroup.enumerate(threadList);
+ // fetch active threads
+ Thread[] activeThreads = new Thread[nbrOfThreads + 2];
+ int activeCount = threadGroup.enumerate(activeThreads, false);
- Map threadMap = new HashMap(threadCount);
- for (int i = 0; i < threadCount; i++) {
- Thread thread = threadList[i];
+ // map threads by name
+ Map threadMap = new HashMap(activeCount);
+ for (int i = 0; i < activeCount; i++) {
+ Thread thread = activeThreads[i];
threadMap.put(thread.getName(), thread);
}
return threadMap;
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -18,10 +18,10 @@
import org.jbpm.persistence.db.DbPersistenceService;
import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-public class JobExecutorThread extends Thread {
+public class JobExecutorThread extends Thread implements Deactivable {
private final JobExecutor jobExecutor;
- private volatile boolean isActive = true;
+ private volatile boolean active = true;
public JobExecutorThread(String name, JobExecutor jobExecutor) {
super(jobExecutor.getThreadGroup(), name);
@@ -39,54 +39,25 @@
}
public void run() {
- int retryInterval = jobExecutor.getRetryInterval();
- while (isActive) {
- // acquire job; on exception, call returns null
+ while (active) {
+ // take on next job
Job job = acquireJob();
- // execute job
- boolean success = true;
+ // if an exception occurs, acquireJob() returns null
if (job != null) {
try {
executeJob(job);
}
catch (Exception e) {
- // on exception, call returns normally
+ // save exception stack trace
+ // if another exception occurs, it is not rethrown
saveJobException(job, e);
- success = false;
}
- }
-
- // if still active wait for next batch
- if (isActive) {
- try {
- if (success) {
- // reset the current retry interval
- retryInterval = jobExecutor.getRetryInterval();
- // wait for next due job
- long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
- if (waitPeriod > 0) {
- synchronized (jobExecutor) {
- jobExecutor.wait(waitPeriod);
- }
- }
- }
- else {
- // sleep instead of waiting on jobExecutor
- // to prevent DbMessageService from waking up this thread
- sleep(retryInterval);
- // after an exception, double the current retry interval
- // to avoid continuous failures during anomalous conditions
- retryInterval *= 2;
- // enforce maximum idle interval
- int maxIdleInterval = jobExecutor.getMaxIdleInterval();
- if (retryInterval > maxIdleInterval || retryInterval < 0) {
- retryInterval = maxIdleInterval;
- }
- }
+ catch (Error e) {
+ // unlock job so it can be dispatched again
+ // if another exception occurs, it is not rethrown
+ unlockJob(job);
+ throw e;
}
- catch (InterruptedException e) {
- if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
- }
}
}
log.info(getName() + " leaves cyberspace");
@@ -94,137 +65,91 @@
/** @deprecated call {@link #acquireJob()} instead **/
protected Collection acquireJobs() {
- boolean debug = log.isDebugEnabled();
Collection jobs;
- // acquire job executor's monitor before creating context and allocating resources
- synchronized (jobExecutor) {
- JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
- try {
- // search for available job
- String lockOwner = getName();
- JobSession jobSession = jbpmContext.getJobSession();
- Job firstJob = jobSession.getFirstAcquirableJob(lockOwner);
- // is there a job?
- if (firstJob != null) {
- // is job exclusive?
- if (firstJob.isExclusive()) {
- // find other exclusive jobs
- ProcessInstance processInstance = firstJob.getProcessInstance();
- jobs = jobSession.findExclusiveJobs(lockOwner, processInstance);
- if (debug) log.debug("acquiring exclusive " + jobs + " for " + processInstance);
- }
- else {
- jobs = Collections.singletonList(firstJob);
- if (debug) log.debug("acquiring " + firstJob);
- }
-
- // acquire jobs
- Date lockTime = new Date();
- for (Iterator i = jobs.iterator(); i.hasNext();) {
- // lock job
- Job job = (Job) i.next();
- job.setLockOwner(lockOwner);
- job.setLockTime(lockTime);
- // has job failed previously?
- if (job.getException() != null) {
- // decrease retry count
- int retries = job.getRetries() - 1;
- job.setRetries(retries);
- if (debug) log.debug(job + " has " + retries + " retries remaining");
- }
- }
- if (debug) log.debug("acquired " + jobs);
+ boolean debug = log.isDebugEnabled();
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ // search for available job
+ String lockOwner = getName();
+ JobSession jobSession = jbpmContext.getJobSession();
+ Job firstJob = jobSession.getFirstAcquirableJob(lockOwner);
+ // is there a job?
+ if (firstJob != null) {
+ // is job exclusive?
+ if (firstJob.isExclusive()) {
+ // find other exclusive jobs
+ ProcessInstance processInstance = firstJob.getProcessInstance();
+ jobs = jobSession.findExclusiveJobs(lockOwner, processInstance);
+ if (debug) log.debug("acquiring exclusive " + jobs + " for " + processInstance);
}
else {
- jobs = Collections.EMPTY_LIST;
- if (debug) log.debug("no acquirable job found");
+ jobs = Collections.singletonList(firstJob);
+ if (debug) log.debug("acquiring " + firstJob);
}
+
+ // acquire jobs
+ Date lockTime = new Date();
+ for (Iterator i = jobs.iterator(); i.hasNext();) {
+ // lock job
+ Job job = (Job) i.next();
+ job.setLockOwner(lockOwner);
+ job.setLockTime(lockTime);
+ }
+ if (debug) log.debug("acquired " + jobs);
}
+ else {
+ jobs = Collections.EMPTY_LIST;
+ if (debug) log.debug("no acquirable job found");
+ }
+ }
+ catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ if (debug) log.debug("failed to acquire jobs", e);
+ jobs = Collections.EMPTY_LIST;
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ finally {
+ try {
+ jbpmContext.close();
+ }
catch (RuntimeException e) {
- jbpmContext.setRollbackOnly();
jobs = Collections.EMPTY_LIST;
if (debug) log.debug("failed to acquire jobs", e);
}
- catch (Error e) {
- jbpmContext.setRollbackOnly();
- throw e;
- }
- finally {
- try {
- jbpmContext.close();
- }
- catch (RuntimeException e) {
- jobs = Collections.EMPTY_LIST;
- if (debug) log.debug("failed to acquire jobs", e);
- }
- }
}
return jobs;
}
protected Job acquireJob() {
- boolean debug = log.isDebugEnabled();
- Job job;
- // acquire job executor's monitor before creating context and allocating resources
- synchronized (jobExecutor) {
- JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
- try {
- // search for available job
- String lockOwner = getName();
- JobSession jobSession = jbpmContext.getJobSession();
- job = jobSession.getFirstAcquirableJob(lockOwner);
- // is there a job?
- if (job != null) {
- // lock job
- job.setLockOwner(lockOwner);
- job.setLockTime(new Date());
- // has job failed previously?
- if (job.getException() != null) {
- // decrease retry count
- int retries = job.getRetries() - 1;
- job.setRetries(retries);
- if (debug) log.debug(job + " has " + retries + " retries remaining");
- }
- if (debug) log.debug("acquired " + job);
- }
- else if (debug) log.debug("no acquirable job found");
- }
- catch (RuntimeException e) {
- jbpmContext.setRollbackOnly();
- job = null;
- if (debug) log.debug("failed to acquire job", e);
- }
- catch (Error e) {
- jbpmContext.setRollbackOnly();
- throw e;
- }
- finally {
- try {
- jbpmContext.close();
- }
- catch (RuntimeException e) {
- job = null;
- if (debug) log.debug("failed to acquire job", e);
- }
- }
+ try {
+ return (Job) jobExecutor.getQueue().take();
}
- return job;
+ catch (InterruptedException e) {
+ if (log.isDebugEnabled()) log.debug(getName() + " got interrupted", e);
+ return null;
+ }
}
protected void executeJob(Job job) throws Exception {
JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
try {
- if (job.isExclusive()) {
- jbpmContext.getGraphSession().lockProcessInstance(job.getProcessInstance());
- }
-
+ // reattach job to persistence context
JobSession jobSession = jbpmContext.getJobSession();
jobSession.reattachJob(job);
-
+
// register process instance for automatic save
// https://jira.jboss.org/browse/JBPM-1015
- jbpmContext.addAutoSaveProcessInstance(job.getProcessInstance());
+ ProcessInstance processInstance = job.getProcessInstance();
+ jbpmContext.addAutoSaveProcessInstance(processInstance);
+ // if job is exclusive, lock process instance
+ if (job.isExclusive()) {
+ jbpmContext.getGraphSession().lockProcessInstance(processInstance);
+ }
+
if (log.isDebugEnabled()) log.debug("executing " + job);
if (job.execute(jbpmContext)) jobSession.deleteJob(job);
}
@@ -251,21 +176,27 @@
log.error("failed to execute " + job, exception);
}
- boolean debug = log.isDebugEnabled();
JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
try {
// do not reattach existing job as it contains undesired updates
- JobSession jobSession = jbpmContext.getJobSession();
- job = jobSession.loadJob(job.getId());
+ jbpmContext.getSession().refresh(job);
// print and save exception
StringWriter out = new StringWriter();
exception.printStackTrace(new PrintWriter(out));
job.setException(out.toString());
+
+ // unlock job so it can be dispatched again
+ job.setLockOwner(null);
+ job.setLockTime(null);
+ // notify job executor
+ synchronized (jobExecutor) {
+ jobExecutor.notify();
+ }
}
catch (RuntimeException e) {
jbpmContext.setRollbackOnly();
- if (debug) log.debug("failed to save job exception", e);
+ log.warn("failed to save exception for " + job, e);
}
catch (Error e) {
jbpmContext.setRollbackOnly();
@@ -276,11 +207,45 @@
jbpmContext.close();
}
catch (RuntimeException e) {
- if (debug) log.debug("failed to save job exception", e);
+ log.warn("failed to save exception for " + job, e);
}
}
}
+ private void unlockJob(Job job) {
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ // do not reattach existing job as it contains undesired updates
+ jbpmContext.getSession().refresh(job);
+
+ // unlock job
+ job.setLockOwner(null);
+ job.setLockTime(null);
+ // notify job executor
+ synchronized (jobExecutor) {
+ jobExecutor.notify();
+ }
+ }
+ catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ log.warn("failed to unlock " + job, e);
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ // do not rethrow as this method is already called in response to an Error
+ log.warn("failed to unlock " + job, e);
+ }
+ finally {
+ try {
+ jbpmContext.close();
+ }
+ catch (RuntimeException e) {
+ log.warn("failed to unlock " + job, e);
+ }
+ }
+ }
+
+ /** @deprecated moved to {@link DispatcherThread} */
protected long getWaitPeriod(int currentIdleInterval) {
Date nextDueDate = getNextDueDate();
if (nextDueDate != null) {
@@ -290,6 +255,7 @@
return currentIdleInterval;
}
+ /** @deprecated moved to {@link DispatcherThread} */
protected Date getNextDueDate() {
Date nextDueDate;
JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
@@ -334,13 +300,9 @@
if (isActive == false) deactivate();
}
- /**
- * Indicates that this thread should stop running. Execution will cease
- * shortly afterwards.
- */
public void deactivate() {
- if (isActive) {
- isActive = false;
+ if (active) {
+ active = false;
interrupt();
}
}
Deleted: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -1,141 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.job.executor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.JbpmContext;
-import org.jbpm.db.JobSession;
-import org.jbpm.job.Job;
-import org.jbpm.persistence.db.DbPersistenceService;
-import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-
-/**
- * @author Alejandro Guizar
- */
-class JobParcel implements Runnable {
-
- private static final Log log = LogFactory.getLog(JobParcel.class);
-
- private final JobExecutor jobExecutor;
- private final Job job;
-
- JobParcel(JobExecutor jobExecutor, Job job) {
- this.jobExecutor = jobExecutor;
- this.job = job;
- }
-
- public void run() {
- try {
- executeJob();
- }
- catch (Exception e) {
- // on exception, call returns normally
- saveJobException(e);
- }
- }
-
- private void executeJob() throws Exception {
- JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
- try {
- if (job.isExclusive()) {
- jbpmContext.getGraphSession().lockProcessInstance(job.getProcessInstance());
- }
-
- JobSession jobSession = jbpmContext.getJobSession();
- jobSession.reattachJob(job);
-
- // register process instance for automatic save
- // https://jira.jboss.org/browse/JBPM-1015
- jbpmContext.addAutoSaveProcessInstance(job.getProcessInstance());
-
- if (log.isDebugEnabled()) log.debug("executing " + job);
- if (job.execute(jbpmContext)) jobSession.deleteJob(job);
- }
- catch (Exception e) {
- jbpmContext.setRollbackOnly();
- throw e;
- }
- catch (Error e) {
- jbpmContext.setRollbackOnly();
- throw e;
- }
- finally {
- jbpmContext.close();
- }
- }
-
- private void saveJobException(Exception exception) {
- // if this is a locking exception, keep it quiet
- if (DbPersistenceService.isLockingException(exception)) {
- StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
- .error("failed to execute " + job, exception);
- }
- else {
- log.error("failed to execute " + job, exception);
- }
-
- boolean debug = log.isDebugEnabled();
- JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
- try {
- // do not reattach existing job as it contains undesired updates
- JobSession jobSession = jbpmContext.getJobSession();
- Job job = jobSession.loadJob(this.job.getId());
-
- // print and save exception
- StringWriter out = new StringWriter();
- exception.printStackTrace(new PrintWriter(out));
- job.setException(out.toString());
-
- // unlock job so it can be dispatched again
- job.setLockOwner(null);
- // notify job executor
- synchronized (jobExecutor) {
- jobExecutor.notify();
- }
- }
- catch (RuntimeException e) {
- jbpmContext.setRollbackOnly();
- if (debug) log.debug("failed to save job exception", e);
- }
- catch (Error e) {
- jbpmContext.setRollbackOnly();
- throw e;
- }
- finally {
- try {
- jbpmContext.close();
- }
- catch (RuntimeException e) {
- if (debug) log.debug("failed to save job exception", e);
- }
- }
- }
-
- public String toString() {
- return "JobParcel(" + job + ')';
- }
-}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -13,12 +13,12 @@
import org.jbpm.persistence.db.DbPersistenceService;
import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-public class LockMonitorThread extends Thread {
+public class LockMonitorThread extends Thread implements Deactivable {
public static final String DEFAULT_NAME = "Monitor";
private final JobExecutor jobExecutor;
- private volatile boolean isActive = true;
+ private volatile boolean active = true;
public LockMonitorThread(JobExecutor jobExecutor) {
this(DEFAULT_NAME, jobExecutor);
@@ -30,8 +30,7 @@
}
/**
- * @deprecated As of jBPM 3.2.6, replaced by
- * {@link #LockMonitorThread(JobExecutor)}
+ * @deprecated As of jBPM 3.2.6, replaced by {@link #LockMonitorThread(JobExecutor)}
*/
public LockMonitorThread(JbpmConfiguration jbpmConfiguration, int lockMonitorInterval,
int maxLockTime, int lockBufferTime) {
@@ -39,14 +38,14 @@
}
public void run() {
- while (isActive) {
+ while (active) {
try {
unlockOverdueJobs();
}
catch (RuntimeException e) {
log.error("exception in " + getName(), e);
}
- if (isActive) {
+ if (active) {
try {
sleep(jobExecutor.getLockMonitorInterval());
}
@@ -101,13 +100,9 @@
if (isActive == false) deactivate();
}
- /**
- * Indicates that this thread should stop running. Execution will cease
- * shortly afterwards.
- */
public void deactivate() {
- if (isActive) {
- isActive = false;
+ if (active) {
+ active = false;
interrupt();
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -23,7 +23,6 @@
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
-import org.jbpm.db.JobSession;
import org.jbpm.job.Job;
import org.jbpm.job.executor.JobExecutor;
import org.jbpm.msg.MessageService;
@@ -32,27 +31,25 @@
private static final long serialVersionUID = 1L;
- private final JobSession jobSession;
- private final JobExecutor jobExecutor;
+ private final JbpmContext jbpmContext;
private boolean hasProducedJobs;
public DbMessageService() {
- JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
+ jbpmContext = JbpmContext.getCurrentJbpmContext();
if (jbpmContext == null) throw new JbpmException("no active jbpm context");
-
- jobSession = jbpmContext.getJobSession();
- jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
}
public void send(Job job) {
- jobSession.saveJob(job);
+ jbpmContext.getJobSession().saveJob(job);
hasProducedJobs = true;
}
public void close() {
// if messages were produced
- if (hasProducedJobs && jobExecutor != null) {
- // notify job executor
+ if (!hasProducedJobs) return;
+ // notify job executor
+ JobExecutor jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
+ if (jobExecutor != null) {
synchronized (jobExecutor) {
jobExecutor.notify();
}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java 2010-11-10 06:38:42 UTC (rev 6810)
@@ -35,7 +35,7 @@
private static final long serialVersionUID = 1L;
private final JbpmContext jbpmContext;
- private boolean hasProducedJobs;
+ private boolean hasProducedTimers;
public DbSchedulerService() {
jbpmContext = JbpmContext.getCurrentJbpmContext();
@@ -51,7 +51,7 @@
}
jbpmContext.getJobSession().saveJob(timer);
- hasProducedJobs = true;
+ hasProducedTimers = true;
}
public void deleteTimer(Timer timer) {
@@ -67,9 +67,7 @@
}
public void close() {
- // if no timers were produced, just return
- if (!hasProducedJobs) return;
-
+ if (!hasProducedTimers) return;
// notify job executor
JobExecutor jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
if (jobExecutor != null) {
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2010-11-10 06:38:42 UTC (rev 6810)
@@ -262,7 +262,7 @@
]]>
</query>
- <query name="JobSession.getFirstAcquirableJobExcludingOwned">
+ <query name="JobSession.getFirstUnownedAcquirableJob">
<![CDATA[
select job
from org.jbpm.job.Job job
@@ -311,7 +311,7 @@
]]>
</query>
- <query name="JobSession.getFirstDueJobExcludingOwned">
+ <query name="JobSession.getFirstUnownedDueJob">
<![CDATA[
select job
from org.jbpm.job.Job job
More information about the jbpm-commits
mailing list