Author: alex.guizar(a)jboss.com
Date: 2009-07-28 22:01:11 -0400 (Tue, 28 Jul 2009)
New Revision: 5365
Added:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java
Removed:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java
Modified:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java
Log:
[JBPM-2402] verify job executor threads in case of errors
replace our custom thread pool with Executors.newFixedThreadPool, which recreates
terminated threads
Modified:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java
===================================================================
---
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java 2009-07-29
01:30:33 UTC (rev 5364)
+++
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java 2009-07-29
02:01:11 UTC (rev 5365)
@@ -85,7 +85,7 @@
throw new JbpmException("job took too long: lock expired
"+(currentTime-lockExpiration)+"ms ago");
}
}
- } catch (Throwable exception) {
+ } catch (Exception exception) {
log.error("exception while executing '"+job+"'",
exception);
handleJobExecutionException(environment, job, exception);
} finally {
@@ -102,7 +102,7 @@
* Transaction.EVENT_AFTERCOMPLETION (after the job locks of the current transaction
are
* released). Then the command will update the job with the exception details in a
separate
* transaction. */
- protected void handleJobExecutionException(Environment environment, JobImpl<?>
job, Throwable exception) {
+ protected void handleJobExecutionException(Environment environment, JobImpl<?>
job, Exception exception) {
Transaction transaction = environment.get(Transaction.class);
// transaction.setRollbackOnly();
Modified:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java
===================================================================
---
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java 2009-07-29
01:30:33 UTC (rev 5364)
+++
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java 2009-07-29
02:01:11 UTC (rev 5365)
@@ -40,7 +40,7 @@
private static final Log log = Log.getLog(DispatcherThread.class.getName());
protected JobExecutor jobExecutor;
- protected boolean isActive = true;
+ protected volatile boolean isActive = true;
protected boolean checkForNewJobs;
protected int currentIdleInterval;
protected Object semaphore = new Object();
@@ -55,7 +55,7 @@
}
public void run() {
- log.info("starting...");
+ log.info("starting " + getName());
currentIdleInterval = jobExecutor.getIdleMillis();
try {
while (isActive) {
@@ -69,7 +69,7 @@
// no exception so resetting the currentIdleInterval
currentIdleInterval = jobExecutor.getIdleMillis();
if ((acquiredJobDbids != null) && (!acquiredJobDbids.isEmpty())) {
- putAcquiredJobDbidsOnQueue(acquiredJobDbids);
+ submitAcquiredJobDbids(acquiredJobDbids);
log.debug("added jobs "+acquiredJobDbids+" to the
queue");
} else if (isActive) {
@@ -88,7 +88,7 @@
}
} catch (InterruptedException e) {
- log.info((isActive ? "active" : "inactivated") + " job
dispatcher thread '" + getName() + "' got interrupted");
+ log.info((isActive ? "active" : "inactive") + " job
dispatcher thread '" + getName() + "' got interrupted");
} catch (Exception e) {
log.error("exception in job executor thread. waiting " +
currentIdleInterval + " milliseconds", e);
try {
@@ -103,24 +103,16 @@
currentIdleInterval = currentIdleInterval * 2;
}
}
- } catch (Throwable t) {
- t.printStackTrace();
} finally {
log.info(getName() + " leaves cyberspace");
}
}
- protected void putAcquiredJobDbidsOnQueue(Collection<Long> acquiredJobDbids) {
- log.debug("pushing jobs on the queue "+acquiredJobDbids);
- while (acquiredJobDbids!=null) {
- try {
- jobExecutor.getJobDbidsQueue().put(acquiredJobDbids);
- log.trace("jobs "+acquiredJobDbids+" were put on the
queue");
- acquiredJobDbids = null;
- } catch (InterruptedException e) {
- log.trace("putting acquired job dbids got interrupted. retrying...");
- }
- }
+ protected void submitAcquiredJobDbids(Collection<Long> jobDbids) {
+ log.debug("submitting jobs "+jobDbids);
+ jobExecutor.getThreadPool().submit(
+ new JobParcel(jobExecutor.getCommandExecutor(), jobDbids));
+ log.trace("jobs "+jobDbids+" were submitted");
}
protected Collection<Long> acquireJobs() {
@@ -161,13 +153,11 @@
isActive = false;
interrupt();
if (join) {
- while (isAlive()) {
- try {
- log.debug("joining "+getName());
- join();
- } catch (InterruptedException e) {
- log.trace("joining "+getName()+" got interrupted");
- }
+ try {
+ log.debug("joining "+getName());
+ join();
+ } catch (InterruptedException e) {
+ log.trace("joining "+getName()+" got interrupted");
}
}
} else {
Modified:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java
===================================================================
---
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java 2009-07-29
01:30:33 UTC (rev 5364)
+++
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java 2009-07-29
02:01:11 UTC (rev 5365)
@@ -23,12 +23,14 @@
import java.io.Serializable;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.jbpm.api.JbpmException;
import org.jbpm.internal.log.Log;
@@ -41,6 +43,7 @@
* @author Tom Baeyens, Guillaume Porcher
*/
public class JobExecutor implements Serializable {
+
private static final Log log = Log.getLog(JobExecutor.class.getName());
private static final long serialVersionUID = 1L;
@@ -60,16 +63,11 @@
Command<Collection<Long>> acquireJobsCommand;
Command<Date> nextDueDateCommand;
- boolean isActive = false;
+ boolean isActive;
- JobExecutorThreadPool jobExecutorThreadPool;
+ ExecutorService threadPool;
+ DispatcherThread dispatcherThread;
- DispatcherThread dispatcherThread = null;
-
- /** queue to dispatch collections of jobDbids to the JobExecutorThreads, which are
- * competing readers. */
- BlockingQueue<Collection<Long>> jobDbidsQueue = null;
-
List<JobHistoryEntry> history = new ArrayList<JobHistoryEntry>();
/** starts the {@link DispatcherThread} and {@link JobExecutorThread}s for this job
executor */
@@ -81,14 +79,9 @@
acquireJobsCommand = new AcquireJobsCmd(this);
nextDueDateCommand = new GetNextDueDateCmd(this);
- // the max capacity of the jobDbidsQueue is set to nbrOfJobExecutorThreads.
- // That way, the distpatcher thread will be stalled if enough jobs are acquired.
- jobDbidsQueue = new ArrayBlockingQueue<Collection<Long>>(nbrOfThreads,
true);
-
isActive = true;
- log.trace("starting job executor threads for job executor
'"+name+"'...");
- jobExecutorThreadPool = new JobExecutorThreadPool(this);
- jobExecutorThreadPool.start();
+ log.trace("starting thread pool for job executor
'"+name+"'...");
+ threadPool = Executors.newFixedThreadPool(nbrOfThreads);
log.trace("starting dispatcher thread for job executor
'"+name+"'...");
dispatcherThread = new DispatcherThread(this);
@@ -117,24 +110,19 @@
if (isActive) {
isActive = false;
dispatcherThread.deactivate(true);
- waitTillQueueEmpty();
- jobExecutorThreadPool.deactivate(join);
+ threadPool.shutdown();
+ if (join) {
+ try {
+ threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.trace("joining "+getName()+" got interrupted");
+ }
+ }
} else {
log.trace("ignoring stop: job executor '"+name+"' not
started");
}
}
- protected void waitTillQueueEmpty() {
- while (! jobDbidsQueue.isEmpty()) {
- log.trace("waiting for job-id-queue to become empty");
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- log.trace("waiting for job-id-queue to become empty got interrupted");
- }
- }
- }
-
public void jobWasAdded() {
if ( (dispatcherThread!=null)
&& (dispatcherThread.isActive())
@@ -143,16 +131,16 @@
}
}
- protected static String getHostName() {
+ static String getHostName() {
try {
return InetAddress.getLocalHost().getHostAddress();
- } catch (Exception e) {
- return "unknown";
+ } catch (UnknownHostException e) {
+ return "localhost";
}
}
- protected BlockingQueue<Collection<Long>> getJobDbidsQueue() {
- return jobDbidsQueue;
+ protected ExecutorService getThreadPool() {
+ return threadPool;
}
// getters //////////////////////////////////////////////////////////////////
Deleted:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java
===================================================================
---
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java 2009-07-29
01:30:33 UTC (rev 5364)
+++
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java 2009-07-29
02:01:11 UTC (rev 5365)
@@ -1,101 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jbpm.pvm.internal.jobexecutor;
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-
-import org.jbpm.internal.log.Log;
-import org.jbpm.pvm.internal.cmd.CommandService;
-import org.jbpm.pvm.internal.cmd.ExecuteJobCmd;
-
-
-/**
- * @author Tom Baeyens, Guillaume Porcher
- */
-public class JobExecutorThread extends Thread {
-
- private static final Log log = Log.getLog(JobExecutorThread.class.getName());
-
- JobExecutor jobExecutor;
- boolean isActive = true;
-
- public JobExecutorThread(String name, JobExecutor jobExecutor) {
- super(name);
- this.jobExecutor = jobExecutor;
- }
-
- public void run() {
- log.info("starting...");
- try {
- BlockingQueue<Collection<Long>> queue =
jobExecutor.getJobDbidsQueue();
- if (queue == null) {
- log.debug("no queue to take jobs from");
- } else {
- while (isActive) {
- try {
- log.trace("taking jobs from queue");
- Collection<Long> jobDbids = null;
- jobDbids = queue.take();
- log.debug("took job(s) "+jobDbids+" from queue");
-
- for (Long jobDbid: jobDbids) {
- CommandService commandService = jobExecutor.getCommandExecutor();
- commandService.execute(new ExecuteJobCmd(jobDbid));
- }
- } catch (InterruptedException e) {
- log.trace("waiting for acquired jobs got interrupted");
- } catch (Exception e) {
- log.error("exception in job executor thread", e);
- }
- }
- }
- } catch (Throwable t) {
- t.printStackTrace();
- } finally {
- log.info(getName()+" leaves cyberspace");
- }
- }
-
- public void deactivate() {
- deactivate(false);
- }
-
- public void deactivate(boolean join) {
- if (isActive) {
- log.trace("deactivating "+getName());
- isActive = false;
- interrupt();
- if (join) {
- while (isAlive()) {
- try {
- join();
- } catch (InterruptedException e) {
- log.trace("joining "+getName()+" got interrupted");
- }
- }
- }
- } else {
- log.trace("ignoring deactivate: "+getName()+" is not active");
- }
- }
-}
Deleted:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java
===================================================================
---
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java 2009-07-29
01:30:33 UTC (rev 5364)
+++
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java 2009-07-29
02:01:11 UTC (rev 5365)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jbpm.pvm.internal.jobexecutor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jbpm.internal.log.Log;
-
-/**
- * @author Tom Baeyens
- */
-public class JobExecutorThreadPool {
-
- private static final Log log = Log.getLog(JobExecutorThreadPool.class.getName());
-
- List<JobExecutorThread> jobExecutorThreads = new
ArrayList<JobExecutorThread>();
- JobExecutor jobExecutor;
-
- public JobExecutorThreadPool(JobExecutor jobExecutor) {
- this.jobExecutor = jobExecutor;
- }
-
- public JobExecutorThread startThread() {
- String threadName = getNextThreadName();
- JobExecutorThread jobExecutorThread = new JobExecutorThread(threadName,
jobExecutor);
- jobExecutorThreads.add(jobExecutorThread);
- log.trace("starting "+threadName);
- jobExecutorThread.start();
- return jobExecutorThread;
- }
-
- public JobExecutorThread stopThread() {
- JobExecutorThread thread = null;
- int lastIndex = jobExecutorThreads.size()-1;
- if (lastIndex>=0) {
- thread = (JobExecutorThread) jobExecutorThreads.remove(lastIndex);
- thread.deactivate();
- }
- return thread;
- }
-
- public void start() {
- for (int i=0; i<jobExecutor.getNbrOfThreads(); i++) {
- startThread();
- }
- }
-
- public void deactivate() {
- deactivate(false);
- }
-
- public void deactivate(boolean join) {
- for (JobExecutorThread jobExecutorThread : jobExecutorThreads) {
- jobExecutorThread.deactivate(join);
- }
- }
-
- protected String getNextThreadName() {
- return "JobExecutorThread" + jobExecutorThreads.size();
- }
-
- public List<JobExecutorThread> getJobExecutorThreads() {
- return jobExecutorThreads;
- }
-}
Added:
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java
===================================================================
---
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java
(rev 0)
+++
jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java 2009-07-29
02:01:11 UTC (rev 5365)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.Collection;
+
+import org.jbpm.internal.log.Log;
+import org.jbpm.pvm.internal.cmd.CommandService;
+import org.jbpm.pvm.internal.cmd.ExecuteJobCmd;
+
+/**
+ * @author Alejandro Guizar
+ */
+public class JobParcel implements Runnable {
+
+ private static final Log log = Log.getLog(JobParcel.class.getName());
+
+ private final CommandService commandService;
+ private final Collection<Long> jobDbids;
+
+ public JobParcel(CommandService commandService, Collection<Long> jobDbids) {
+ this.commandService = commandService;
+ this.jobDbids = jobDbids;
+ }
+
+ public void run() {
+ try {
+ for (Long jobDbid : jobDbids) {
+ commandService.execute(new ExecuteJobCmd(jobDbid));
+ }
+ } catch (RuntimeException e) {
+ log.error("exception in job block", e);
+ }
+ }
+}