[jbpm-commits] JBoss JBPM SVN: r6810 - in jbpm3/branches/jbpm-3.2-soa/core/src/main: java/org/jbpm/job/executor and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 10 01:38:43 EST 2010


Author: alex.guizar at jboss.com
Date: 2010-11-10 01:38:42 -0500 (Wed, 10 Nov 2010)
New Revision: 6810

Added:
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java
Removed:
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
Modified:
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
Log:
JBPM-2959 backport dispatcher thread from jBPM 4;
employ BlockingQueue instead of ThreadPoolExecutor to not abandon JobExecutorThread

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -41,7 +41,7 @@
 
 public class JobSession {
 
-  final Session session;
+  private final Session session;
 
   public JobSession(Session session) {
     this.session = session;
@@ -50,14 +50,13 @@
   public Job getFirstAcquirableJob(String lockOwner) {
     try {
       Query query;
-      if (lockOwner != null) {
+      if (lockOwner == null) {
+        query = session.getNamedQuery("JobSession.getFirstUnownedAcquirableJob");
+      }
+      else {
         query = session.getNamedQuery("JobSession.getFirstAcquirableJob")
           .setString("lockOwner", lockOwner);
       }
-      else {
-        query = session.getNamedQuery("JobSession.getFirstAcquirableJobExcludingOwned");
-      }
-
       return (Job) query.setTimestamp("now", new Date()).setMaxResults(1).uniqueResult();
     }
     catch (HibernateException e) {
@@ -94,7 +93,7 @@
     try {
       Query query;
       if (lockOwner == null) {
-        query = session.getNamedQuery("JobSession.getFirstDueJobExcludingOwned");
+        query = session.getNamedQuery("JobSession.getFirstUnownedDueJob");
       }
       else if (monitoredJobs == null || monitoredJobs.isEmpty()) {
         query = session.getNamedQuery("JobSession.getFirstDueJob")
@@ -105,7 +104,6 @@
           .setString("lockOwner", lockOwner)
           .setParameterList("monitoredJobIds", monitoredJobs);
       }
-
       return (Job) query.setMaxResults(1).uniqueResult();
     }
     catch (HibernateException e) {

Added: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.job.executor;
+
+/**
+ * A thread that can be deactivated.
+ * 
+ * @author Alejandro Guizar
+ */
+interface Deactivable {
+
+  /**
+   * Tells this thread to stop running. Execution will cease shortly afterwards.
+   */
+  void deactivate();
+}


Property changes on: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/Deactivable.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -35,13 +35,13 @@
  * 
  * @author Alejandro Guizar
  */
-class DispatcherThread extends Thread {
+class DispatcherThread extends Thread implements Deactivable {
 
+  public static final String DEFAULT_NAME = "Dispatcher";
+
   private static final Log log = LogFactory.getLog(DispatcherThread.class);
 
-  public static final String DEFAULT_NAME = "Dispatcher";
-
-  private JobExecutor jobExecutor;
+  private final JobExecutor jobExecutor;
   private volatile boolean active = true;
 
   DispatcherThread(JobExecutor jobExecutor) {
@@ -49,7 +49,7 @@
   }
 
   DispatcherThread(String name, JobExecutor jobExecutor) {
-    super(name);
+    super(jobExecutor.getThreadGroup(), name);
     this.jobExecutor = jobExecutor;
   }
 
@@ -61,7 +61,7 @@
       // submit job
       if (job != null) submitJob(job);
 
-      // if still active wait for next job
+      // if still active, wait or sleep
       if (active) {
         try {
           if (job != null) {
@@ -77,7 +77,7 @@
           }
           else {
             // sleep instead of waiting on jobExecutor
-            // to prevent DbMessageService from waking up this thread
+            // to prevent message/scheduler service from waking up this thread
             sleep(retryInterval);
             // after an exception, double the current retry interval
             // to avoid continuous failures during anomalous conditions
@@ -145,11 +145,45 @@
   }
 
   private void submitJob(Job job) {
-    JobParcel jobParcel = new JobParcel(jobExecutor, job);
-    jobExecutor.getExecutorService().execute(jobParcel);
-    if (log.isDebugEnabled()) log.debug("submitted " + job);
+    boolean debug = log.isDebugEnabled();
+    try {
+      jobExecutor.getQueue().put(job);
+      if (debug) log.debug("submitted " + job);
+    }
+    catch (InterruptedException e) {
+      unlockJob(job);
+      if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
+    }
   }
 
+  private void unlockJob(Job job) {
+    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+    try {
+      // reattach job to persistence context
+      jbpmContext.getJobSession().reattachJob(job);
+
+      // unlock job so it can be dispatched again
+      job.setLockOwner(null);
+      job.setLockTime(null);
+    }
+    catch (RuntimeException e) {
+      jbpmContext.setRollbackOnly();
+      log.warn("failed to unlock " + job, e);
+    }
+    catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    }
+    finally {
+      try {
+        jbpmContext.close();
+      }
+      catch (RuntimeException e) {
+        log.warn("failed to unlock " + job, e);
+      }
+    }
+  }
+
   private long getWaitPeriod(int currentIdleInterval) {
     Date nextDueDate = getNextDueDate();
     if (nextDueDate != null) {
@@ -193,9 +227,6 @@
     return nextDueDate;
   }
 
-  /**
-   * Indicates that this thread should stop running. Execution will cease shortly afterwards.
-   */
   public void deactivate() {
     if (active) {
       active = false;

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -3,27 +3,22 @@
 import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.jbpm.JbpmConfiguration;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
-import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
-import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
 public class JobExecutor implements Serializable {
 
@@ -43,13 +38,12 @@
   /** @deprecated property has no effect */
   protected int lockBufferTime;
 
-  ThreadGroup threadGroup;
-  private ExecutorService executorService;
+  private ThreadGroup threadGroup;
+  private BlockingQueue queue = new SynchronousQueue();
 
   /** @deprecated call {@link #getThreads()} instead */
   protected Map threads;
-
-  private DispatcherThread dispatcherThread;
+  /** @deprecated call {@link #getThreads()} instead */
   protected LockMonitorThread lockMonitorThread;
 
   protected Map monitoredJobIds = new Hashtable();
@@ -65,7 +59,10 @@
       // create thread group
       threadGroup = new ThreadGroup(name) {
         public void uncaughtException(Thread thread, Throwable throwable) {
-          if (thread instanceof DispatcherThread) {
+          if (thread instanceof JobExecutorThread) {
+            startThread(thread.getName());
+          }
+          else if (thread instanceof DispatcherThread) {
             startDispatcherThread();
           }
           else if (thread instanceof LockMonitorThread) {
@@ -75,50 +72,21 @@
         }
       };
 
-      // create thread factory
-      ThreadFactory threadFactory = new ThreadFactory() {
-        public Thread newThread(Runnable task) {
-          Thread thread = new Thread(threadGroup, task, getNextThreadName());
-          if (thread.isDaemon()) thread.setDaemon(false);
-          if (thread.getPriority() != Thread.NORM_PRIORITY)
-            thread.setPriority(Thread.NORM_PRIORITY);
-          return thread;
-        }
-      };
+      // start executor threads
+      for (int i = 1; i <= nbrOfThreads; i++) {
+        startThread(getThreadName(i));
+      }
 
-      // start executor service
-      executorService = new ThreadPoolExecutor(nbrOfThreads,
-        nbrOfThreads,
-        0L,
-        TimeUnit.MILLISECONDS,
-        new SynchronousQueue(),
-        threadFactory,
-        JobRejectionHandler.INSTANCE);
-
-      // start helper threads
+      // start control threads
       startDispatcherThread();
       startLockMonitorThread();
       isStarted = true;
     }
     else if (log.isDebugEnabled()) {
-      log.debug("ignoring start: " + name + " already started'");
+      log.debug("ignoring start: " + name + " already started");
     }
   }
 
-  private static class JobRejectionHandler implements RejectedExecutionHandler {
-
-    static final JobRejectionHandler INSTANCE = new JobRejectionHandler();
-
-    public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
-      try {
-        executor.getQueue().put(task);
-      }
-      catch (InterruptedException e) {
-        throw new RejectedExecutionException("queuing " + task + " got interrupted", e);
-      }
-    }
-  }
-
   /**
    * tells all threads in this job executor to stop. Threads may be in the middle of processing
    * a job and they will finish that first. Use {@link #stopAndJoin()} in case you want a method
@@ -133,53 +101,55 @@
       return Collections.EMPTY_LIST;
     }
 
-    // list active threads
-    Thread[] activeThreads = new Thread[threadGroup.activeCount()];
-    threadGroup.enumerate(activeThreads);
+    log.info("stopping " + name);
+    isStarted = false;
 
-    // tell threads to stop
-    stopThreads();
+    // fetch active threads
+    Thread[] activeThreads = new Thread[nbrOfThreads + 2];
+    int activeCount = threadGroup.enumerate(activeThreads, false);
 
-    // return formerly active threads
-    return Arrays.asList(activeThreads);
-  }
-
-  public void stopAndJoin() throws InterruptedException {
-    if (!isStarted) {
-      if (log.isDebugEnabled()) log.debug("ignoring stop, " + name + " not started");
-      return;
+    // deactivate threads
+    List deactivatedThreads = new ArrayList(activeCount);
+    for (int i = 0; i < activeCount; i++) {
+      Thread thread = activeThreads[i];
+      if (thread instanceof Deactivable) {
+        Deactivable deactivable = (Deactivable) thread;
+        deactivable.deactivate();
+        deactivatedThreads.add(thread);
+      }
     }
 
-    // tell threads to stop
-    stopThreads();
-
-    // wait for executor to terminate
-    executorService.awaitTermination(1, TimeUnit.MINUTES);
-
-    // join helper threads
-    dispatcherThread.join();
-    lockMonitorThread.join();
+    // return deactivated threads
+    return deactivatedThreads;
   }
 
-  private void stopThreads() {
-    log.info("stopping " + name);
-    isStarted = false;
+  public void stopAndJoin() throws InterruptedException {
+    // deactivate threads
+    List threads = stop();
 
-    // shut down execution service
-    executorService.shutdown();
-
-    // deactivate helper threads
-    dispatcherThread.deactivate();
-    lockMonitorThread.deactivate();
+    // join deactivated threads
+    for (Iterator i = threads.iterator(); i.hasNext();) {
+      Thread thread = (Thread) i.next();
+      thread.join();
+    }
   }
 
   public void ensureThreadsAreActive() {
-    // executor service looks after its own threads
-    // just confirm dispatcher and lock monitor are alive
-    if (!dispatcherThread.isAlive()) {
+    Map threads = getThreads();
+
+    // check executor threads
+    for (int i = 1; i <= nbrOfThreads; i++) {
+      String threadName = getThreadName(i);
+      if (!threads.containsKey(threadName)) {
+        startThread(threadName);
+      }
+    }
+
+    // check control threads
+    if (!threads.containsKey(getDispatcherThreadName())) {
       startDispatcherThread();
     }
-    if (!lockMonitorThread.isAlive()) {
+    if (!threads.containsKey(getLockMonitorThreadName())) {
       startLockMonitorThread();
     }
   }
@@ -188,16 +158,19 @@
     return threadGroup;
   }
 
-  ExecutorService getExecutorService() {
-    return executorService;
+  BlockingQueue getQueue() {
+    return queue;
   }
 
+  private String getThreadName(int index) {
+    return name + '@' + getHostAddress() + ":Executor-" + index;
+  }
+
   /** @deprecated no longer invoked */
   protected void startThread() {
     startThread(getNextThreadName());
   }
 
-  /** @deprecated no longer invoked */
   protected void startThread(String threadName) {
     Thread thread = createThread(threadName);
 
@@ -205,56 +178,58 @@
     thread.start();
   }
 
-  /** @deprecated no longer invoked */
   protected Thread createThread(String threadName) {
     return new JobExecutorThread(threadName, this);
   }
 
+  /** @deprecated no longer invoked */
   protected String getNextThreadName() {
     return getThreadName(threadGroup.activeCount() + 1);
   }
 
   /** @deprecated no longer invoked */
   protected String getLastThreadName() {
-    return getThreadName(threadGroup.activeCount());
+    Map threads = getThreads();
+    for (int i = nbrOfThreads; i > 0; i--) {
+      String threadName = getThreadName(i);
+      if (threads.containsKey(threadName)) return threadName;
+    }
+    return null;
   }
 
   /** @deprecated no longer invoked */
   protected synchronized Thread stopThread() {
-    String threadName = getLastThreadName();
-    if (log.isDebugEnabled()) log.debug("stopping " + threadName);
-
-    Thread thread = (Thread) threads.remove(threadName);
-    stopThread(thread);
-    return thread;
-  }
-
-  private void stopThread(Thread thread) {
-    if (thread instanceof JobExecutorThread) {
-      JobExecutorThread executor = (JobExecutorThread) thread;
-      executor.deactivate();
+    Map threads = getThreads();
+    for (int i = nbrOfThreads; i > 0; i--) {
+      String threadName = getThreadName(i);
+      JobExecutorThread executorThread = (JobExecutorThread) threads.get(threadName);
+      if (executorThread != null) {
+        executorThread.deactivate();
+        return executorThread;
+      }
     }
-    else if (thread instanceof LockMonitorThread) {
-      LockMonitorThread monitor = (LockMonitorThread) thread;
-      monitor.deactivate();
-    }
+    return null;
   }
 
-  private String getThreadName(int index) {
-    return name + '@' + getHostAddress() + ":Executor-" + index;
+  private String getDispatcherThreadName() {
+    return name + '@' + getHostAddress() + ':' + DispatcherThread.DEFAULT_NAME;
   }
 
   void startDispatcherThread() {
-    String threadName = name + '@' + getHostAddress() + ':' + DispatcherThread.DEFAULT_NAME;
-    dispatcherThread = new DispatcherThread(threadName, this);
+    String threadName = getDispatcherThreadName();
+    Thread dispatcherThread = new DispatcherThread(threadName, this);
 
     if (log.isDebugEnabled()) log.debug("starting " + threadName);
     dispatcherThread.start();
   }
 
+  private String getLockMonitorThreadName() {
+    return name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
+  }
+
   void startLockMonitorThread() {
-    String threadName = name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
-    lockMonitorThread = new LockMonitorThread(threadName, this);
+    String threadName = getLockMonitorThreadName();
+    Thread lockMonitorThread = new LockMonitorThread(threadName, this);
 
     if (log.isDebugEnabled()) log.debug("starting " + threadName);
     lockMonitorThread.start();
@@ -269,8 +244,8 @@
     }
   }
 
-  public Set getMonitoredJobIds() {
-    return new HashSet(monitoredJobIds.values());
+  public Collection getMonitoredJobIds() {
+    return monitoredJobIds.values();
   }
 
   public void addMonitoredJobId(String threadName, long jobId) {
@@ -379,12 +354,14 @@
   }
 
   public Map getThreads() {
-    Thread[] threadList = new Thread[threadGroup.activeCount()];
-    int threadCount = threadGroup.enumerate(threadList);
+    // fetch active threads
+    Thread[] activeThreads = new Thread[nbrOfThreads + 2];
+    int activeCount = threadGroup.enumerate(activeThreads, false);
 
-    Map threadMap = new HashMap(threadCount);
-    for (int i = 0; i < threadCount; i++) {
-      Thread thread = threadList[i];
+    // map threads by name
+    Map threadMap = new HashMap(activeCount);
+    for (int i = 0; i < activeCount; i++) {
+      Thread thread = activeThreads[i];
       threadMap.put(thread.getName(), thread);
     }
     return threadMap;

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -18,10 +18,10 @@
 import org.jbpm.persistence.db.DbPersistenceService;
 import org.jbpm.persistence.db.StaleObjectLogConfigurer;
 
-public class JobExecutorThread extends Thread {
+public class JobExecutorThread extends Thread implements Deactivable {
 
   private final JobExecutor jobExecutor;
-  private volatile boolean isActive = true;
+  private volatile boolean active = true;
 
   public JobExecutorThread(String name, JobExecutor jobExecutor) {
     super(jobExecutor.getThreadGroup(), name);
@@ -39,54 +39,25 @@
   }
 
   public void run() {
-    int retryInterval = jobExecutor.getRetryInterval();
-    while (isActive) {
-      // acquire job; on exception, call returns null
+    while (active) {
+      // take on next job
       Job job = acquireJob();
-      // execute job
-      boolean success = true;
+      // if an exception occurs, acquireJob() returns null
       if (job != null) {
         try {
           executeJob(job);
         }
         catch (Exception e) {
-          // on exception, call returns normally
+          // save exception stack trace
+          // if another exception occurs, it is not rethrown
           saveJobException(job, e);
-          success = false;
         }
-      }
-
-      // if still active wait for next batch
-      if (isActive) {
-        try {
-          if (success) {
-            // reset the current retry interval
-            retryInterval = jobExecutor.getRetryInterval();
-            // wait for next due job
-            long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
-            if (waitPeriod > 0) {
-              synchronized (jobExecutor) {
-                jobExecutor.wait(waitPeriod);
-              }
-            }
-          }
-          else {
-            // sleep instead of waiting on jobExecutor
-            // to prevent DbMessageService from waking up this thread
-            sleep(retryInterval);
-            // after an exception, double the current retry interval
-            // to avoid continuous failures during anomalous conditions
-            retryInterval *= 2;
-            // enforce maximum idle interval
-            int maxIdleInterval = jobExecutor.getMaxIdleInterval();
-            if (retryInterval > maxIdleInterval || retryInterval < 0) {
-              retryInterval = maxIdleInterval;
-            }
-          }
+        catch (Error e) {
+          // unlock job so it can be dispatched again
+          // if another exception occurs, it is not rethrown
+          unlockJob(job);
+          throw e;
         }
-        catch (InterruptedException e) {
-          if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
-        }
       }
     }
     log.info(getName() + " leaves cyberspace");
@@ -94,137 +65,91 @@
 
   /** @deprecated call {@link #acquireJob()} instead **/
   protected Collection acquireJobs() {
-    boolean debug = log.isDebugEnabled();
     Collection jobs;
-    // acquire job executor's monitor before creating context and allocating resources
-    synchronized (jobExecutor) {
-      JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
-      try {
-        // search for available job
-        String lockOwner = getName();
-        JobSession jobSession = jbpmContext.getJobSession();
-        Job firstJob = jobSession.getFirstAcquirableJob(lockOwner);
-        // is there a job?
-        if (firstJob != null) {
-          // is job exclusive?
-          if (firstJob.isExclusive()) {
-            // find other exclusive jobs
-            ProcessInstance processInstance = firstJob.getProcessInstance();
-            jobs = jobSession.findExclusiveJobs(lockOwner, processInstance);
-            if (debug) log.debug("acquiring exclusive " + jobs + " for " + processInstance);
-          }
-          else {
-            jobs = Collections.singletonList(firstJob);
-            if (debug) log.debug("acquiring " + firstJob);
-          }
-
-          // acquire jobs
-          Date lockTime = new Date();
-          for (Iterator i = jobs.iterator(); i.hasNext();) {
-            // lock job
-            Job job = (Job) i.next();
-            job.setLockOwner(lockOwner);
-            job.setLockTime(lockTime);
-            // has job failed previously?
-            if (job.getException() != null) {
-              // decrease retry count
-              int retries = job.getRetries() - 1;
-              job.setRetries(retries);
-              if (debug) log.debug(job + " has " + retries + " retries remaining");
-            }
-          }
-          if (debug) log.debug("acquired " + jobs);
+    boolean debug = log.isDebugEnabled();
+    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+    try {
+      // search for available job
+      String lockOwner = getName();
+      JobSession jobSession = jbpmContext.getJobSession();
+      Job firstJob = jobSession.getFirstAcquirableJob(lockOwner);
+      // is there a job?
+      if (firstJob != null) {
+        // is job exclusive?
+        if (firstJob.isExclusive()) {
+          // find other exclusive jobs
+          ProcessInstance processInstance = firstJob.getProcessInstance();
+          jobs = jobSession.findExclusiveJobs(lockOwner, processInstance);
+          if (debug) log.debug("acquiring exclusive " + jobs + " for " + processInstance);
         }
         else {
-          jobs = Collections.EMPTY_LIST;
-          if (debug) log.debug("no acquirable job found");
+          jobs = Collections.singletonList(firstJob);
+          if (debug) log.debug("acquiring " + firstJob);
         }
+
+        // acquire jobs
+        Date lockTime = new Date();
+        for (Iterator i = jobs.iterator(); i.hasNext();) {
+          // lock job
+          Job job = (Job) i.next();
+          job.setLockOwner(lockOwner);
+          job.setLockTime(lockTime);
+        }
+        if (debug) log.debug("acquired " + jobs);
       }
+      else {
+        jobs = Collections.EMPTY_LIST;
+        if (debug) log.debug("no acquirable job found");
+      }
+    }
+    catch (RuntimeException e) {
+      jbpmContext.setRollbackOnly();
+      if (debug) log.debug("failed to acquire jobs", e);
+      jobs = Collections.EMPTY_LIST;
+    }
+    catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    }
+    finally {
+      try {
+        jbpmContext.close();
+      }
       catch (RuntimeException e) {
-        jbpmContext.setRollbackOnly();
         jobs = Collections.EMPTY_LIST;
         if (debug) log.debug("failed to acquire jobs", e);
       }
-      catch (Error e) {
-        jbpmContext.setRollbackOnly();
-        throw e;
-      }
-      finally {
-        try {
-          jbpmContext.close();
-        }
-        catch (RuntimeException e) {
-          jobs = Collections.EMPTY_LIST;
-          if (debug) log.debug("failed to acquire jobs", e);
-        }
-      }
     }
     return jobs;
   }
 
   protected Job acquireJob() {
-    boolean debug = log.isDebugEnabled();
-    Job job;
-    // acquire job executor's monitor before creating context and allocating resources
-    synchronized (jobExecutor) {
-      JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
-      try {
-        // search for available job
-        String lockOwner = getName();
-        JobSession jobSession = jbpmContext.getJobSession();
-        job = jobSession.getFirstAcquirableJob(lockOwner);
-        // is there a job?
-        if (job != null) {
-          // lock job
-          job.setLockOwner(lockOwner);
-          job.setLockTime(new Date());
-          // has job failed previously?
-          if (job.getException() != null) {
-            // decrease retry count
-            int retries = job.getRetries() - 1;
-            job.setRetries(retries);
-            if (debug) log.debug(job + " has " + retries + " retries remaining");
-          }
-          if (debug) log.debug("acquired " + job);
-        }
-        else if (debug) log.debug("no acquirable job found");
-      }
-      catch (RuntimeException e) {
-        jbpmContext.setRollbackOnly();
-        job = null;
-        if (debug) log.debug("failed to acquire job", e);
-      }
-      catch (Error e) {
-        jbpmContext.setRollbackOnly();
-        throw e;
-      }
-      finally {
-        try {
-          jbpmContext.close();
-        }
-        catch (RuntimeException e) {
-          job = null;
-          if (debug) log.debug("failed to acquire job", e);
-        }
-      }
+    try {
+      return (Job) jobExecutor.getQueue().take();
     }
-    return job;
+    catch (InterruptedException e) {
+      if (log.isDebugEnabled()) log.debug(getName() + " got interrupted", e);
+      return null;
+    }
   }
 
   protected void executeJob(Job job) throws Exception {
     JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
     try {
-      if (job.isExclusive()) {
-        jbpmContext.getGraphSession().lockProcessInstance(job.getProcessInstance());
-      }
-
+      // reattach job to persistence context
       JobSession jobSession = jbpmContext.getJobSession();
       jobSession.reattachJob(job);
-
+      
       // register process instance for automatic save
       // https://jira.jboss.org/browse/JBPM-1015
-      jbpmContext.addAutoSaveProcessInstance(job.getProcessInstance());
+      ProcessInstance processInstance = job.getProcessInstance();
+      jbpmContext.addAutoSaveProcessInstance(processInstance);
 
+      // if job is exclusive, lock process instance
+      if (job.isExclusive()) {
+        jbpmContext.getGraphSession().lockProcessInstance(processInstance);
+      }
+
       if (log.isDebugEnabled()) log.debug("executing " + job);
       if (job.execute(jbpmContext)) jobSession.deleteJob(job);
     }
@@ -251,21 +176,27 @@
       log.error("failed to execute " + job, exception);
     }
 
-    boolean debug = log.isDebugEnabled();
     JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
     try {
       // do not reattach existing job as it contains undesired updates
-      JobSession jobSession = jbpmContext.getJobSession();
-      job = jobSession.loadJob(job.getId());
+      jbpmContext.getSession().refresh(job);
 
       // print and save exception
       StringWriter out = new StringWriter();
       exception.printStackTrace(new PrintWriter(out));
       job.setException(out.toString());
+
+      // unlock job so it can be dispatched again
+      job.setLockOwner(null);
+      job.setLockTime(null);
+      // notify job executor
+      synchronized (jobExecutor) {
+        jobExecutor.notify();
+      }
     }
     catch (RuntimeException e) {
       jbpmContext.setRollbackOnly();
-      if (debug) log.debug("failed to save job exception", e);
+      log.warn("failed to save exception for " + job, e);
     }
     catch (Error e) {
       jbpmContext.setRollbackOnly();
@@ -276,11 +207,45 @@
         jbpmContext.close();
       }
       catch (RuntimeException e) {
-        if (debug) log.debug("failed to save job exception", e);
+        log.warn("failed to save exception for " + job, e);
       }
     }
   }
 
+  private void unlockJob(Job job) {
+    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+    try {
+      // do not reattach existing job as it contains undesired updates
+      jbpmContext.getSession().refresh(job);
+
+      // unlock job
+      job.setLockOwner(null);
+      job.setLockTime(null);
+      // notify job executor
+      synchronized (jobExecutor) {
+        jobExecutor.notify();
+      }
+    }
+    catch (RuntimeException e) {
+      jbpmContext.setRollbackOnly();
+      log.warn("failed to unlock " + job, e);
+    }
+    catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      // do not rethrow as this method is already called in response to an Error
+      log.warn("failed to unlock " + job, e);
+    }
+    finally {
+      try {
+        jbpmContext.close();
+      }
+      catch (RuntimeException e) {
+        log.warn("failed to unlock " + job, e);
+      }
+    }
+  }
+
+  /** @deprecated moved to {@link DispatcherThread} */
   protected long getWaitPeriod(int currentIdleInterval) {
     Date nextDueDate = getNextDueDate();
     if (nextDueDate != null) {
@@ -290,6 +255,7 @@
     return currentIdleInterval;
   }
 
+  /** @deprecated moved to {@link DispatcherThread} */
   protected Date getNextDueDate() {
     Date nextDueDate;
     JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
@@ -334,13 +300,9 @@
     if (isActive == false) deactivate();
   }
 
-  /**
-   * Indicates that this thread should stop running. Execution will cease
-   * shortly afterwards.
-   */
   public void deactivate() {
-    if (isActive) {
-      isActive = false;
+    if (active) {
+      active = false;
       interrupt();
     }
   }

Deleted: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -1,141 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.job.executor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.JbpmContext;
-import org.jbpm.db.JobSession;
-import org.jbpm.job.Job;
-import org.jbpm.persistence.db.DbPersistenceService;
-import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-
-/**
- * @author Alejandro Guizar
- */
-class JobParcel implements Runnable {
-
-  private static final Log log = LogFactory.getLog(JobParcel.class);
-
-  private final JobExecutor jobExecutor;
-  private final Job job;
-
-  JobParcel(JobExecutor jobExecutor, Job job) {
-    this.jobExecutor = jobExecutor;
-    this.job = job;
-  }
-
-  public void run() {
-    try {
-      executeJob();
-    }
-    catch (Exception e) {
-      // on exception, call returns normally
-      saveJobException(e);
-    }
-  }
-
-  private void executeJob() throws Exception {
-    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
-    try {
-      if (job.isExclusive()) {
-        jbpmContext.getGraphSession().lockProcessInstance(job.getProcessInstance());
-      }
-
-      JobSession jobSession = jbpmContext.getJobSession();
-      jobSession.reattachJob(job);
-
-      // register process instance for automatic save
-      // https://jira.jboss.org/browse/JBPM-1015
-      jbpmContext.addAutoSaveProcessInstance(job.getProcessInstance());
-
-      if (log.isDebugEnabled()) log.debug("executing " + job);
-      if (job.execute(jbpmContext)) jobSession.deleteJob(job);
-    }
-    catch (Exception e) {
-      jbpmContext.setRollbackOnly();
-      throw e;
-    }
-    catch (Error e) {
-      jbpmContext.setRollbackOnly();
-      throw e;
-    }
-    finally {
-      jbpmContext.close();
-    }
-  }
-
-  private void saveJobException(Exception exception) {
-    // if this is a locking exception, keep it quiet
-    if (DbPersistenceService.isLockingException(exception)) {
-      StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
-        .error("failed to execute " + job, exception);
-    }
-    else {
-      log.error("failed to execute " + job, exception);
-    }
-
-    boolean debug = log.isDebugEnabled();
-    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
-    try {
-      // do not reattach existing job as it contains undesired updates
-      JobSession jobSession = jbpmContext.getJobSession();
-      Job job = jobSession.loadJob(this.job.getId());
-
-      // print and save exception
-      StringWriter out = new StringWriter();
-      exception.printStackTrace(new PrintWriter(out));
-      job.setException(out.toString());
-
-      // unlock job so it can be dispatched again
-      job.setLockOwner(null);
-      // notify job executor
-      synchronized (jobExecutor) {
-        jobExecutor.notify();
-      }
-    }
-    catch (RuntimeException e) {
-      jbpmContext.setRollbackOnly();
-      if (debug) log.debug("failed to save job exception", e);
-    }
-    catch (Error e) {
-      jbpmContext.setRollbackOnly();
-      throw e;
-    }
-    finally {
-      try {
-        jbpmContext.close();
-      }
-      catch (RuntimeException e) {
-        if (debug) log.debug("failed to save job exception", e);
-      }
-    }
-  }
-
-  public String toString() {
-    return "JobParcel(" + job + ')';
-  }
-}

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -13,12 +13,12 @@
 import org.jbpm.persistence.db.DbPersistenceService;
 import org.jbpm.persistence.db.StaleObjectLogConfigurer;
 
-public class LockMonitorThread extends Thread {
+public class LockMonitorThread extends Thread implements Deactivable {
 
   public static final String DEFAULT_NAME = "Monitor";
 
   private final JobExecutor jobExecutor;
-  private volatile boolean isActive = true;
+  private volatile boolean active = true;
 
   public LockMonitorThread(JobExecutor jobExecutor) {
     this(DEFAULT_NAME, jobExecutor);
@@ -30,8 +30,7 @@
   }
 
   /**
-   * @deprecated As of jBPM 3.2.6, replaced by
-   * {@link #LockMonitorThread(JobExecutor)}
+   * @deprecated As of jBPM 3.2.6, replaced by {@link #LockMonitorThread(JobExecutor)}
    */
   public LockMonitorThread(JbpmConfiguration jbpmConfiguration, int lockMonitorInterval,
     int maxLockTime, int lockBufferTime) {
@@ -39,14 +38,14 @@
   }
 
   public void run() {
-    while (isActive) {
+    while (active) {
       try {
         unlockOverdueJobs();
       }
       catch (RuntimeException e) {
         log.error("exception in " + getName(), e);
       }
-      if (isActive) {
+      if (active) {
         try {
           sleep(jobExecutor.getLockMonitorInterval());
         }
@@ -101,13 +100,9 @@
     if (isActive == false) deactivate();
   }
 
-  /**
-   * Indicates that this thread should stop running. Execution will cease
-   * shortly afterwards.
-   */
   public void deactivate() {
-    if (isActive) {
-      isActive = false;
+    if (active) {
+      active = false;
       interrupt();
     }
   }

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/msg/db/DbMessageService.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -23,7 +23,6 @@
 
 import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
-import org.jbpm.db.JobSession;
 import org.jbpm.job.Job;
 import org.jbpm.job.executor.JobExecutor;
 import org.jbpm.msg.MessageService;
@@ -32,27 +31,25 @@
 
   private static final long serialVersionUID = 1L;
 
-  private final JobSession jobSession;
-  private final JobExecutor jobExecutor;
+  private final JbpmContext jbpmContext;
   private boolean hasProducedJobs;
 
   public DbMessageService() {
-    JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
+    jbpmContext = JbpmContext.getCurrentJbpmContext();
     if (jbpmContext == null) throw new JbpmException("no active jbpm context");
-
-    jobSession = jbpmContext.getJobSession();
-    jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
   }
 
   public void send(Job job) {
-    jobSession.saveJob(job);
+    jbpmContext.getJobSession().saveJob(job);
     hasProducedJobs = true;
   }
 
   public void close() {
     // if messages were produced
-    if (hasProducedJobs && jobExecutor != null) {
-      // notify job executor
+    if (!hasProducedJobs) return;
+    // notify job executor
+    JobExecutor jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
+    if (jobExecutor != null) {
       synchronized (jobExecutor) {
         jobExecutor.notify();
       }

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java	2010-11-10 06:38:42 UTC (rev 6810)
@@ -35,7 +35,7 @@
   private static final long serialVersionUID = 1L;
 
   private final JbpmContext jbpmContext;
-  private boolean hasProducedJobs;
+  private boolean hasProducedTimers;
 
   public DbSchedulerService() {
     jbpmContext = JbpmContext.getCurrentJbpmContext();
@@ -51,7 +51,7 @@
     }
 
     jbpmContext.getJobSession().saveJob(timer);
-    hasProducedJobs = true;
+    hasProducedTimers = true;
   }
 
   public void deleteTimer(Timer timer) {
@@ -67,9 +67,7 @@
   }
 
   public void close() {
-    // if no timers were produced, just return
-    if (!hasProducedJobs) return;
-
+    if (!hasProducedTimers) return;
     // notify job executor
     JobExecutor jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
     if (jobExecutor != null) {

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2010-11-09 21:05:06 UTC (rev 6809)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2010-11-10 06:38:42 UTC (rev 6810)
@@ -262,7 +262,7 @@
     ]]>
   </query>
 
-  <query name="JobSession.getFirstAcquirableJobExcludingOwned">
+  <query name="JobSession.getFirstUnownedAcquirableJob">
     <![CDATA[
       select job
       from org.jbpm.job.Job job
@@ -311,7 +311,7 @@
     ]]>
   </query>
 
-  <query name="JobSession.getFirstDueJobExcludingOwned">
+  <query name="JobSession.getFirstUnownedDueJob">
     <![CDATA[
       select job
       from org.jbpm.job.Job job



More information about the jbpm-commits mailing list