[jbpm-commits] JBoss JBPM SVN: r6809 - in jbpm3/branches/jbpm-3.2-soa/core: src/main/java/org/jbpm/db and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Nov 9 16:05:07 EST 2010
Author: alex.guizar at jboss.com
Date: 2010-11-09 16:05:06 -0500 (Tue, 09 Nov 2010)
New Revision: 6809
Added:
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
Modified:
jbpm3/branches/jbpm-3.2-soa/core/pom.xml
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java
jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java
jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
Log:
JBPM-2959 backport dispatcher thread from jBPM 4;
switch to ThreadPoolExecutor is not backwards compatible and does not restart fallen threads quickly
Modified: jbpm3/branches/jbpm-3.2-soa/core/pom.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/pom.xml 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/pom.xml 2010-11-09 21:05:06 UTC (rev 6809)
@@ -27,6 +27,12 @@
<dependencies>
<!-- Compile Dependencies -->
<dependency>
+ <groupId>backport-util-concurrent</groupId>
+ <artifactId>backport-util-concurrent</artifactId>
+ <version>3.1</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>bsh</groupId>
<artifactId>bsh</artifactId>
<optional>true</optional>
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -212,7 +212,7 @@
/**
* Waits until all jobs are processed or a specified amount of time has elapsed. Unlike
- * {@link #processJobs(long)}, this method does not concern itself with the job executor or
+ * {@link #processJobs(long)}, this method is not concerned about the job executor or
* the jBPM context.
*/
protected void waitForJobs(final long timeout) {
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -49,11 +49,16 @@
public Job getFirstAcquirableJob(String lockOwner) {
try {
- return (Job) session.getNamedQuery("JobSession.getFirstAcquirableJob")
- .setString("lockOwner", lockOwner)
- .setTimestamp("now", new Date())
- .setMaxResults(1)
- .uniqueResult();
+ Query query;
+ if (lockOwner != null) {
+ query = session.getNamedQuery("JobSession.getFirstAcquirableJob")
+ .setString("lockOwner", lockOwner);
+ }
+ else {
+ query = session.getNamedQuery("JobSession.getFirstAcquirableJobExcludingOwned");
+ }
+
+ return (Job) query.setTimestamp("now", new Date()).setMaxResults(1).uniqueResult();
}
catch (HibernateException e) {
throw new JbpmPersistenceException("could not get first acquirable job", e);
@@ -88,14 +93,20 @@
public Job getFirstDueJob(String lockOwner, Collection monitoredJobs) {
try {
Query query;
- if (monitoredJobs == null || monitoredJobs.isEmpty()) {
- query = session.getNamedQuery("JobSession.getFirstDueJob");
+ if (lockOwner == null) {
+ query = session.getNamedQuery("JobSession.getFirstDueJobExcludingOwned");
}
+ else if (monitoredJobs == null || monitoredJobs.isEmpty()) {
+ query = session.getNamedQuery("JobSession.getFirstDueJob")
+ .setString("lockOwner", lockOwner);
+ }
else {
- query = session.getNamedQuery("JobSession.getFirstDueJobExcludingMonitoredJobs");
- query.setParameterList("monitoredJobIds", monitoredJobs);
+ query = session.getNamedQuery("JobSession.getFirstDueJobExcludingMonitoredJobs")
+ .setString("lockOwner", lockOwner)
+ .setParameterList("monitoredJobIds", monitoredJobs);
}
- return (Job) query.setString("lockOwner", lockOwner).setMaxResults(1).uniqueResult();
+
+ return (Job) query.setMaxResults(1).uniqueResult();
}
catch (HibernateException e) {
throw new JbpmPersistenceException("could not get first due job owned by '" + lockOwner
@@ -109,10 +120,8 @@
if (job instanceof Timer) {
Timer timer = (Timer) job;
Action action = timer.getAction();
- if (action != null && action.getId() == 0) {
- // transient action, save it
- session.save(action);
- }
+ // if action is transient, save it
+ if (action != null && action.getId() == 0L) session.save(action);
}
}
catch (HibernateException e) {
@@ -255,8 +264,8 @@
.list();
}
catch (HibernateException e) {
- throw new JbpmPersistenceException("could not find jobs with lock time over " +
- threshold, e);
+ throw new JbpmPersistenceException("could not find jobs with lock time over " + threshold,
+ e);
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -128,8 +128,7 @@
*/
public void take(ExecutionContext executionContext) {
if (condition != null && isConditionEnforced) {
- Boolean result = (Boolean) JbpmExpressionEvaluator.evaluate(condition, executionContext,
- Boolean.class);
+ Boolean result = (Boolean) JbpmExpressionEvaluator.evaluate(condition, executionContext, Boolean.class);
if (!Boolean.TRUE.equals(result)) {
throw new JbpmException("condition '" + condition + "' guarding " + this + " not met");
}
@@ -140,7 +139,8 @@
token.setNode(null);
// start the transition log
- TransitionLog transitionLog = new TransitionLog(this, executionContext.getTransitionSource());
+ TransitionLog transitionLog = new TransitionLog(this,
+ executionContext.getTransitionSource());
token.startCompositeLog(transitionLog);
try {
// fire leave events for superstates (if any)
@@ -248,6 +248,12 @@
return result;
}
+ public String toString() {
+ return "Transition("
+ + (name != null ? name + ')' : (from != null && to != null) ? from + "->" + to
+ : id != 0 ? id + ")" : '@' + Integer.toHexString(hashCode()));
+ }
+
// other
// ///////////////////////////////////////////////////////////////////////////
Added: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -0,0 +1,205 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.job.executor;
+
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.JbpmContext;
+import org.jbpm.job.Job;
+
+/**
+ * Thread responsible for acquiring jobs and then dispatching them to one of the threads in the
+ * job executor thread pool.
+ *
+ * @author Alejandro Guizar
+ */
+class DispatcherThread extends Thread {
+
+ private static final Log log = LogFactory.getLog(DispatcherThread.class);
+
+ public static final String DEFAULT_NAME = "Dispatcher";
+
+ private JobExecutor jobExecutor;
+ private volatile boolean active = true;
+
+ DispatcherThread(JobExecutor jobExecutor) {
+ this(DEFAULT_NAME, jobExecutor);
+ }
+
+ DispatcherThread(String name, JobExecutor jobExecutor) {
+ super(name);
+ this.jobExecutor = jobExecutor;
+ }
+
+ public void run() {
+ int retryInterval = jobExecutor.getRetryInterval();
+ while (active) {
+ // acquire job; on exception, call returns null
+ Job job = acquireJob();
+ // submit job
+ if (job != null) submitJob(job);
+
+ // if still active wait for next job
+ if (active) {
+ try {
+ if (job != null) {
+ // reset the current retry interval
+ retryInterval = jobExecutor.getRetryInterval();
+ // wait for next due job
+ long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
+ if (waitPeriod > 0) {
+ synchronized (jobExecutor) {
+ jobExecutor.wait(waitPeriod);
+ }
+ }
+ }
+ else {
+ // sleep instead of waiting on jobExecutor
+ // to prevent DbMessageService from waking up this thread
+ sleep(retryInterval);
+ // after an exception, double the current retry interval
+ // to avoid continuous failures during anomalous conditions
+ retryInterval *= 2;
+ // enforce maximum idle interval
+ int maxIdleInterval = jobExecutor.getMaxIdleInterval();
+ if (retryInterval > maxIdleInterval || retryInterval < 0) {
+ retryInterval = maxIdleInterval;
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
+ }
+ }
+ }
+ log.info(getName() + " leaves cyberspace");
+ }
+
+ private Job acquireJob() {
+ boolean debug = log.isDebugEnabled();
+ Job job;
+ // acquire job executor's monitor before creating context and allocating resources
+ synchronized (jobExecutor) {
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ // look for available job
+ job = jbpmContext.getJobSession().getFirstAcquirableJob(null);
+ // is there a job?
+ if (job != null) {
+ // lock job
+ job.setLockOwner(getName());
+ job.setLockTime(new Date());
+ // has job failed previously?
+ if (job.getException() != null) {
+ // decrease retry count
+ int retries = job.getRetries() - 1;
+ job.setRetries(retries);
+ if (debug) log.debug(job + " has " + retries + " retries remaining");
+ }
+ if (debug) log.debug("acquired " + job);
+ }
+ else if (debug) log.debug("no acquirable job found");
+ }
+ catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ job = null;
+ if (debug) log.debug("failed to acquire job", e);
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ finally {
+ try {
+ jbpmContext.close();
+ }
+ catch (RuntimeException e) {
+ job = null;
+ if (debug) log.debug("failed to acquire job", e);
+ }
+ }
+ }
+ return job;
+ }
+
+ private void submitJob(Job job) {
+ JobParcel jobParcel = new JobParcel(jobExecutor, job);
+ jobExecutor.getExecutorService().execute(jobParcel);
+ if (log.isDebugEnabled()) log.debug("submitted " + job);
+ }
+
+ private long getWaitPeriod(int currentIdleInterval) {
+ Date nextDueDate = getNextDueDate();
+ if (nextDueDate != null) {
+ long waitPeriod = nextDueDate.getTime() - System.currentTimeMillis();
+ if (waitPeriod < currentIdleInterval) return waitPeriod;
+ }
+ return currentIdleInterval;
+ }
+
+ private Date getNextDueDate() {
+ Date nextDueDate;
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ Job job = jbpmContext.getJobSession().getFirstDueJob(null, null);
+ if (job != null) {
+ nextDueDate = job.getDueDate();
+ }
+ else {
+ nextDueDate = null;
+ if (log.isDebugEnabled()) log.debug("no due job found");
+ }
+ }
+ catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ nextDueDate = null;
+ if (log.isDebugEnabled()) log.debug("failed to determine next due date", e);
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ finally {
+ try {
+ jbpmContext.close();
+ }
+ catch (RuntimeException e) {
+ nextDueDate = null;
+ if (log.isDebugEnabled()) log.debug("failed to determine next due date", e);
+ }
+ }
+ return nextDueDate;
+ }
+
+ /**
+ * Indicates that this thread should stop running. Execution will cease shortly afterwards.
+ */
+ public void deactivate() {
+ if (active) {
+ active = false;
+ interrupt();
+ }
+ }
+}
Property changes on: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -8,7 +8,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -18,6 +17,14 @@
import org.jbpm.JbpmConfiguration;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
+import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
+import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
public class JobExecutor implements Serializable {
private static final long serialVersionUID = 1L;
@@ -36,10 +43,13 @@
/** @deprecated property has no effect */
protected int lockBufferTime;
- private ThreadGroup threadGroup;
+ ThreadGroup threadGroup;
+ private ExecutorService executorService;
+
/** @deprecated call {@link #getThreads()} instead */
protected Map threads;
- /** @deprecated call {@link #getThreads()} instead */
+
+ private DispatcherThread dispatcherThread;
protected LockMonitorThread lockMonitorThread;
protected Map monitoredJobIds = new Hashtable();
@@ -55,8 +65,8 @@
// create thread group
threadGroup = new ThreadGroup(name) {
public void uncaughtException(Thread thread, Throwable throwable) {
- if (thread instanceof JobExecutorThread) {
- startThread(thread.getName());
+ if (thread instanceof DispatcherThread) {
+ startDispatcherThread();
}
else if (thread instanceof LockMonitorThread) {
startLockMonitorThread();
@@ -65,11 +75,28 @@
}
};
- // start executor threads
- for (int i = 0; i < nbrOfThreads; i++) {
- startThread();
- }
+ // create thread factory
+ ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable task) {
+ Thread thread = new Thread(threadGroup, task, getNextThreadName());
+ if (thread.isDaemon()) thread.setDaemon(false);
+ if (thread.getPriority() != Thread.NORM_PRIORITY)
+ thread.setPriority(Thread.NORM_PRIORITY);
+ return thread;
+ }
+ };
+ // start executor service
+ executorService = new ThreadPoolExecutor(nbrOfThreads,
+ nbrOfThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new SynchronousQueue(),
+ threadFactory,
+ JobRejectionHandler.INSTANCE);
+
+ // start helper threads
+ startDispatcherThread();
startLockMonitorThread();
isStarted = true;
}
@@ -78,10 +105,24 @@
}
}
+ private static class JobRejectionHandler implements RejectedExecutionHandler {
+
+ static final JobRejectionHandler INSTANCE = new JobRejectionHandler();
+
+ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(task);
+ }
+ catch (InterruptedException e) {
+ throw new RejectedExecutionException("queuing " + task + " got interrupted", e);
+ }
+ }
+ }
+
/**
- * signals to all threads in this job executor to stop. Threads may be in the middle of
- * processing a job and they will finish that first. Use {@link #stopAndJoin()} in case you
- * want a method that blocks until all the threads are actually finished.
+ * tells all threads in this job executor to stop. Threads may be in the middle of processing
+ * a job and they will finish that first. Use {@link #stopAndJoin()} in case you want a method
+ * that blocks until all the threads are actually finished.
*
* @return a list of the stopped threads. In case no threads were stopped an empty list will
* be returned.
@@ -92,57 +133,71 @@
return Collections.EMPTY_LIST;
}
- log.info("stopping " + name);
- isStarted = false;
-
+ // list active threads
Thread[] activeThreads = new Thread[threadGroup.activeCount()];
- int threadCount = threadGroup.enumerate(activeThreads);
- for (int i = 0; i < threadCount; i++) {
- stopThread(activeThreads[i]);
- }
+ threadGroup.enumerate(activeThreads);
+ // tell threads to stop
+ stopThreads();
+
+ // return formerly active threads
return Arrays.asList(activeThreads);
}
public void stopAndJoin() throws InterruptedException {
- for (Iterator i = stop().iterator(); i.hasNext();) {
- Thread thread = (Thread) i.next();
- thread.join();
+ if (!isStarted) {
+ if (log.isDebugEnabled()) log.debug("ignoring stop, " + name + " not started");
+ return;
}
+
+ // tell threads to stop
+ stopThreads();
+
+ // wait for executor to terminate
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+ // join helper threads
+ dispatcherThread.join();
+ lockMonitorThread.join();
}
- public void ensureThreadsAreActive() {
- int activeCount = threadGroup.activeCount();
- if (activeCount < nbrOfThreads + 1) {
- // find out who is missing
- Thread[] activeThreads = new Thread[activeCount];
- activeCount = threadGroup.enumerate(activeThreads);
+ private void stopThreads() {
+ log.info("stopping " + name);
+ isStarted = false;
- for (int i = 1; i <= nbrOfThreads; i++) {
- String threadName = getThreadName(i);
- if (!contains(activeThreads, activeCount, threadName)) {
- // thread-i is missing, restart it
- startThread(threadName);
- }
- }
- }
+ // shut down execution service
+ executorService.shutdown();
+
+ // deactivate helper threads
+ dispatcherThread.deactivate();
+ lockMonitorThread.deactivate();
}
- private static boolean contains(Thread[] threads, int count, String threadName) {
- for (int i = 0; i < count; i++) {
- if (threadName.equals(threads[i].getName())) return true;
+ public void ensureThreadsAreActive() {
+ // executor service looks after its own threads
+ // just confirm dispatcher and lock monitor are alive
+ if (!dispatcherThread.isAlive()) {
+ startDispatcherThread();
}
- return false;
+ if (!lockMonitorThread.isAlive()) {
+ startLockMonitorThread();
+ }
}
ThreadGroup getThreadGroup() {
return threadGroup;
}
+ ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ /** @deprecated no longer invoked */
protected void startThread() {
startThread(getNextThreadName());
}
+ /** @deprecated no longer invoked */
protected void startThread(String threadName) {
Thread thread = createThread(threadName);
@@ -150,6 +205,7 @@
thread.start();
}
+ /** @deprecated no longer invoked */
protected Thread createThread(String threadName) {
return new JobExecutorThread(threadName, this);
}
@@ -158,11 +214,12 @@
return getThreadName(threadGroup.activeCount() + 1);
}
+ /** @deprecated no longer invoked */
protected String getLastThreadName() {
return getThreadName(threadGroup.activeCount());
}
- /** @deprecated */
+ /** @deprecated no longer invoked */
protected synchronized Thread stopThread() {
String threadName = getLastThreadName();
if (log.isDebugEnabled()) log.debug("stopping " + threadName);
@@ -187,18 +244,22 @@
return name + '@' + getHostAddress() + ":Executor-" + index;
}
+ void startDispatcherThread() {
+ String threadName = name + '@' + getHostAddress() + ':' + DispatcherThread.DEFAULT_NAME;
+ dispatcherThread = new DispatcherThread(threadName, this);
+
+ if (log.isDebugEnabled()) log.debug("starting " + threadName);
+ dispatcherThread.start();
+ }
+
void startLockMonitorThread() {
- String threadName = getLockMonitorThreadName();
- Thread lockMonitorThread = new LockMonitorThread(threadName, this);
+ String threadName = name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
+ lockMonitorThread = new LockMonitorThread(threadName, this);
if (log.isDebugEnabled()) log.debug("starting " + threadName);
lockMonitorThread.start();
}
- private String getLockMonitorThreadName() {
- return name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
- }
-
private static String getHostAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -175,11 +175,9 @@
job = jobSession.getFirstAcquirableJob(lockOwner);
// is there a job?
if (job != null) {
- // acquire jobs
- Date lockTime = new Date();
// lock job
job.setLockOwner(lockOwner);
- job.setLockTime(lockTime);
+ job.setLockTime(new Date());
// has job failed previously?
if (job.getException() != null) {
// decrease retry count
Added: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.job.executor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.JbpmContext;
+import org.jbpm.db.JobSession;
+import org.jbpm.job.Job;
+import org.jbpm.persistence.db.DbPersistenceService;
+import org.jbpm.persistence.db.StaleObjectLogConfigurer;
+
+/**
+ * @author Alejandro Guizar
+ */
+class JobParcel implements Runnable {
+
+ private static final Log log = LogFactory.getLog(JobParcel.class);
+
+ private final JobExecutor jobExecutor;
+ private final Job job;
+
+ JobParcel(JobExecutor jobExecutor, Job job) {
+ this.jobExecutor = jobExecutor;
+ this.job = job;
+ }
+
+ public void run() {
+ try {
+ executeJob();
+ }
+ catch (Exception e) {
+ // on exception, call returns normally
+ saveJobException(e);
+ }
+ }
+
+ private void executeJob() throws Exception {
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ if (job.isExclusive()) {
+ jbpmContext.getGraphSession().lockProcessInstance(job.getProcessInstance());
+ }
+
+ JobSession jobSession = jbpmContext.getJobSession();
+ jobSession.reattachJob(job);
+
+ // register process instance for automatic save
+ // https://jira.jboss.org/browse/JBPM-1015
+ jbpmContext.addAutoSaveProcessInstance(job.getProcessInstance());
+
+ if (log.isDebugEnabled()) log.debug("executing " + job);
+ if (job.execute(jbpmContext)) jobSession.deleteJob(job);
+ }
+ catch (Exception e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ finally {
+ jbpmContext.close();
+ }
+ }
+
+ private void saveJobException(Exception exception) {
+ // if this is a locking exception, keep it quiet
+ if (DbPersistenceService.isLockingException(exception)) {
+ StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
+ .error("failed to execute " + job, exception);
+ }
+ else {
+ log.error("failed to execute " + job, exception);
+ }
+
+ boolean debug = log.isDebugEnabled();
+ JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+ try {
+ // do not reattach existing job as it contains undesired updates
+ JobSession jobSession = jbpmContext.getJobSession();
+ Job job = jobSession.loadJob(this.job.getId());
+
+ // print and save exception
+ StringWriter out = new StringWriter();
+ exception.printStackTrace(new PrintWriter(out));
+ job.setException(out.toString());
+
+ // unlock job so it can be dispatched again
+ job.setLockOwner(null);
+ // notify job executor
+ synchronized (jobExecutor) {
+ jobExecutor.notify();
+ }
+ }
+ catch (RuntimeException e) {
+ jbpmContext.setRollbackOnly();
+ if (debug) log.debug("failed to save job exception", e);
+ }
+ catch (Error e) {
+ jbpmContext.setRollbackOnly();
+ throw e;
+ }
+ finally {
+ try {
+ jbpmContext.close();
+ }
+ catch (RuntimeException e) {
+ if (debug) log.debug("failed to save job exception", e);
+ }
+ }
+ }
+
+ public String toString() {
+ return "JobParcel(" + job + ')';
+ }
+}
Property changes on: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -34,7 +34,7 @@
* manner.
*
* @author Alejandro Guizar
- * @deprecated no use for this class
+ * @deprecated not in use anymore
*/
public class CustomLoaderObjectInputStream extends ObjectInputStream {
@@ -73,7 +73,8 @@
* security-sensitive methods; note that this class does <em>not</em>
* override said methods
*/
- public CustomLoaderObjectInputStream(InputStream in, ClassLoader customLoader) throws IOException {
+ public CustomLoaderObjectInputStream(InputStream in, ClassLoader customLoader)
+ throws IOException {
super(in);
if (customLoader == null) {
throw new IllegalArgumentException("custom class loader is null");
@@ -88,7 +89,8 @@
return customLoader;
}
- protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ protected Class resolveClass(ObjectStreamClass desc) throws IOException,
+ ClassNotFoundException {
try {
return super.resolveClass(desc);
}
@@ -97,7 +99,8 @@
}
}
- protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+ protected Class resolveProxyClass(String[] interfaces) throws IOException,
+ ClassNotFoundException {
try {
return super.resolveProxyClass(interfaces);
}
@@ -119,11 +122,11 @@
}
try {
return Proxy.getProxyClass(nonPublicLoader != null ? nonPublicLoader : customLoader,
- classes);
+ classes);
}
catch (IllegalArgumentException iae) {
throw new ClassNotFoundException("could not get proxy class for interfaces: "
- + ArrayUtil.toString(classes), e);
+ + ArrayUtil.toString(classes), e);
}
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -23,6 +23,7 @@
import org.hibernate.proxy.HibernateProxy;
+/** @deprecated not in use anymore */
public class EqualsUtil {
private EqualsUtil() {
@@ -32,12 +33,14 @@
/**
* hack to support comparing hibernate proxies against the real objects.
* since it falls back to ==, clients don't need to override hashcode.
+ *
* @deprecated hack does not work
* @see <a href="https://jira.jboss.org/jira/browse/JBPM-2489">JBPM-2489</a>
*/
public static boolean equals(Object thisObject, Object otherObject) {
- return thisObject == otherObject || otherObject instanceof HibernateProxy
- && otherObject.equals(thisObject);
+ return thisObject == otherObject
+ || otherObject instanceof HibernateProxy
+ && otherObject.equals(thisObject);
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -3,12 +3,16 @@
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
+/** @deprecated not in use anymore */
public class StringUtil implements Serializable {
private static final long serialVersionUID = 1L;
- static final byte[] HEX_CHAR_TABLE = { (byte) '0', (byte) '1', (byte) '2', (byte) '3', (byte) '4', (byte) '5', (byte) '6', (byte) '7', (byte) '8',
- (byte) '9', (byte) 'a', (byte) 'b', (byte) 'c', (byte) 'd', (byte) 'e', (byte) 'f' };
+ static final byte[] HEX_CHAR_TABLE = {
+ (byte) '0', (byte) '1', (byte) '2', (byte) '3', (byte) '4', (byte) '5', (byte) '6',
+ (byte) '7', (byte) '8', (byte) '9', (byte) 'a', (byte) 'b', (byte) 'c', (byte) 'd',
+ (byte) 'e', (byte) 'f'
+ };
private StringUtil() {
// hide default constructor to prevent instantiation
@@ -19,27 +23,27 @@
byte[] hex = new byte[2 * bytes.length];
int index = 0;
- for (int i=0; i<bytes.length; i++) {
+ for (int i = 0; i < bytes.length; i++) {
byte b = bytes[i];
int v = b & 0xFF;
hex[index++] = HEX_CHAR_TABLE[v >>> 4];
hex[index++] = HEX_CHAR_TABLE[v & 0xF];
}
return new String(hex, "US-ASCII");
- } catch (UnsupportedEncodingException e) {
+ }
+ catch (UnsupportedEncodingException e) {
// should not happen, US-ASCII is a standard charset
throw new AssertionError(e);
}
}
-
+
public static String toHexStringHibernate(byte[] bytes) {
StringBuffer buf = new StringBuffer();
- for ( int i=0; i<bytes.length; i++ ) {
- String hexStr = Integer.toHexString( bytes[i] - Byte.MIN_VALUE );
- if ( hexStr.length()==1 ) buf.append('0');
- buf.append(hexStr);
+ for (int i = 0; i < bytes.length; i++) {
+ String hexStr = Integer.toHexString(bytes[i] - Byte.MIN_VALUE);
+ if (hexStr.length() == 1) buf.append('0');
+ buf.append(hexStr);
}
return buf.toString();
}
-
}
\ No newline at end of file
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -24,7 +24,9 @@
import org.jbpm.JbpmException;
public class XmlException extends JbpmException {
+
private static final long serialVersionUID = 1L;
+
public XmlException(String message, Throwable cause) {
super(message, cause);
}
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml 2010-11-09 21:05:06 UTC (rev 6809)
@@ -262,6 +262,18 @@
]]>
</query>
+ <query name="JobSession.getFirstAcquirableJobExcludingOwned">
+ <![CDATA[
+ select job
+ from org.jbpm.job.Job job
+ where job.lockOwner is null
+ and job.retries > 0
+ and job.dueDate <= :now
+ and job.isSuspended = false
+ order by job.dueDate asc
+ ]]>
+ </query>
+
<query name="JobSession.findExclusiveJobs">
<![CDATA[
select job
@@ -299,6 +311,17 @@
]]>
</query>
+ <query name="JobSession.getFirstDueJobExcludingOwned">
+ <![CDATA[
+ select job
+ from org.jbpm.job.Job job
+ where job.lockOwner is null
+ and job.retries > 0
+ and job.isSuspended = false
+ order by job.dueDate asc
+ ]]>
+ </query>
+
<query name="JobSession.suspendJobs">
<![CDATA[
update org.jbpm.job.Job job
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -65,7 +65,10 @@
"org.jbpm.instantiation.FieldInstantiator",
"org.jbpm.instantiation.ProcessClassLoader",
"org.jbpm.instantiation.XmlInstantiator",
+ "org.jbpm.job.executor.DispatcherThread",
"org.jbpm.job.executor.JobExecutorThread",
+ "org.jbpm.job.executor.JobExecutor$JobRejectionHandler",
+ "org.jbpm.job.executor.JobParcel",
"org.jbpm.job.executor.LockMonitorThread",
"org.jbpm.jpdl.convert.Converter",
"org.jbpm.jpdl.el.",
Modified: jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java 2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java 2010-11-09 21:05:06 UTC (rev 6809)
@@ -48,14 +48,13 @@
deployProcessDefinition(processDefinition);
}
- // check if the process ends correctly if no Error is thrown
+ /** check if the process ends correctly if no Error is thrown */
public void testTimerWithoutErrorAction() {
throwError = false;
runTimerErrorAction();
}
- // check if the process ends correctly if an Error is thrown in the
- // ActionHandler
+ /** check if the process ends correctly if an Error is thrown in the ActionHandler */
public void testTimerWithErrorAction() {
throwError = true;
runTimerErrorAction();
@@ -67,9 +66,8 @@
processInstance.signal();
processJobs();
-
processInstance = jbpmContext.loadProcessInstance(processInstance.getId());
- assert processInstance.hasEnded() : processInstance;
+ assert processInstance.hasEnded() : "expected " + processInstance + " to have ended";
}
public static class TimerExceptionAction implements ActionHandler {
More information about the jbpm-commits
mailing list