[jbpm-commits] JBoss JBPM SVN: r6809 - in jbpm3/branches/jbpm-3.2-soa/core: src/main/java/org/jbpm/db and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 9 16:05:07 EST 2010


Author: alex.guizar at jboss.com
Date: 2010-11-09 16:05:06 -0500 (Tue, 09 Nov 2010)
New Revision: 6809

Added:
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
Modified:
   jbpm3/branches/jbpm-3.2-soa/core/pom.xml
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java
   jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
   jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java
   jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
Log:
JBPM-2959 backport dispatcher thread from jBPM 4;
switch to ThreadPoolExecutor is not backwards compatible and does not restart fallen threads quickly

Modified: jbpm3/branches/jbpm-3.2-soa/core/pom.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/pom.xml	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/pom.xml	2010-11-09 21:05:06 UTC (rev 6809)
@@ -27,6 +27,12 @@
   <dependencies>
     <!-- Compile Dependencies -->
     <dependency>
+      <groupId>backport-util-concurrent</groupId>
+      <artifactId>backport-util-concurrent</artifactId>
+      <version>3.1</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>bsh</groupId>
       <artifactId>bsh</artifactId>
       <optional>true</optional>

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -212,7 +212,7 @@
 
   /**
    * Waits until all jobs are processed or a specified amount of time has elapsed. Unlike
-   * {@link #processJobs(long)}, this method does not concern itself with the job executor or
+   * {@link #processJobs(long)}, this method is not concerned about the job executor or
    * the jBPM context.
    */
   protected void waitForJobs(final long timeout) {

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/JobSession.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -49,11 +49,16 @@
 
   public Job getFirstAcquirableJob(String lockOwner) {
     try {
-      return (Job) session.getNamedQuery("JobSession.getFirstAcquirableJob")
-        .setString("lockOwner", lockOwner)
-        .setTimestamp("now", new Date())
-        .setMaxResults(1)
-        .uniqueResult();
+      Query query;
+      if (lockOwner != null) {
+        query = session.getNamedQuery("JobSession.getFirstAcquirableJob")
+          .setString("lockOwner", lockOwner);
+      }
+      else {
+        query = session.getNamedQuery("JobSession.getFirstAcquirableJobExcludingOwned");
+      }
+
+      return (Job) query.setTimestamp("now", new Date()).setMaxResults(1).uniqueResult();
     }
     catch (HibernateException e) {
       throw new JbpmPersistenceException("could not get first acquirable job", e);
@@ -88,14 +93,20 @@
   public Job getFirstDueJob(String lockOwner, Collection monitoredJobs) {
     try {
       Query query;
-      if (monitoredJobs == null || monitoredJobs.isEmpty()) {
-        query = session.getNamedQuery("JobSession.getFirstDueJob");
+      if (lockOwner == null) {
+        query = session.getNamedQuery("JobSession.getFirstDueJobExcludingOwned");
       }
+      else if (monitoredJobs == null || monitoredJobs.isEmpty()) {
+        query = session.getNamedQuery("JobSession.getFirstDueJob")
+          .setString("lockOwner", lockOwner);
+      }
       else {
-        query = session.getNamedQuery("JobSession.getFirstDueJobExcludingMonitoredJobs");
-        query.setParameterList("monitoredJobIds", monitoredJobs);
+        query = session.getNamedQuery("JobSession.getFirstDueJobExcludingMonitoredJobs")
+          .setString("lockOwner", lockOwner)
+          .setParameterList("monitoredJobIds", monitoredJobs);
       }
-      return (Job) query.setString("lockOwner", lockOwner).setMaxResults(1).uniqueResult();
+
+      return (Job) query.setMaxResults(1).uniqueResult();
     }
     catch (HibernateException e) {
       throw new JbpmPersistenceException("could not get first due job owned by '" + lockOwner
@@ -109,10 +120,8 @@
       if (job instanceof Timer) {
         Timer timer = (Timer) job;
         Action action = timer.getAction();
-        if (action != null && action.getId() == 0) {
-          // transient action, save it
-          session.save(action);
-        }
+        // if action is transient, save it
+        if (action != null && action.getId() == 0L) session.save(action);
       }
     }
     catch (HibernateException e) {
@@ -255,8 +264,8 @@
         .list();
     }
     catch (HibernateException e) {
-      throw new JbpmPersistenceException("could not find jobs with lock time over " +
-        threshold, e);
+      throw new JbpmPersistenceException("could not find jobs with lock time over " + threshold,
+        e);
     }
   }
 

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/graph/def/Transition.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -128,8 +128,7 @@
    */
   public void take(ExecutionContext executionContext) {
     if (condition != null && isConditionEnforced) {
-      Boolean result = (Boolean) JbpmExpressionEvaluator.evaluate(condition, executionContext,
-        Boolean.class);
+      Boolean result = (Boolean) JbpmExpressionEvaluator.evaluate(condition, executionContext, Boolean.class);
       if (!Boolean.TRUE.equals(result)) {
         throw new JbpmException("condition '" + condition + "' guarding " + this + " not met");
       }
@@ -140,7 +139,8 @@
     token.setNode(null);
 
     // start the transition log
-    TransitionLog transitionLog = new TransitionLog(this, executionContext.getTransitionSource());
+    TransitionLog transitionLog = new TransitionLog(this,
+      executionContext.getTransitionSource());
     token.startCompositeLog(transitionLog);
     try {
       // fire leave events for superstates (if any)
@@ -248,6 +248,12 @@
     return result;
   }
 
+  public String toString() {
+    return "Transition("
+      + (name != null ? name + ')' : (from != null && to != null) ? from + "->" + to
+        : id != 0 ? id + ")" : '@' + Integer.toHexString(hashCode()));
+  }
+
   // other
   // ///////////////////////////////////////////////////////////////////////////
 

Added: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -0,0 +1,205 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.job.executor;
+
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.JbpmContext;
+import org.jbpm.job.Job;
+
+/**
+ * Thread responsible for acquiring jobs and then dispatching them to one of the threads in the
+ * job executor thread pool.
+ * 
+ * @author Alejandro Guizar
+ */
+class DispatcherThread extends Thread {
+
+  private static final Log log = LogFactory.getLog(DispatcherThread.class);
+
+  public static final String DEFAULT_NAME = "Dispatcher";
+
+  private JobExecutor jobExecutor;
+  private volatile boolean active = true;
+
+  DispatcherThread(JobExecutor jobExecutor) {
+    this(DEFAULT_NAME, jobExecutor);
+  }
+
+  DispatcherThread(String name, JobExecutor jobExecutor) {
+    super(name);
+    this.jobExecutor = jobExecutor;
+  }
+
+  public void run() {
+    int retryInterval = jobExecutor.getRetryInterval();
+    while (active) {
+      // acquire job; on exception, call returns null
+      Job job = acquireJob();
+      // submit job
+      if (job != null) submitJob(job);
+
+      // if still active wait for next job
+      if (active) {
+        try {
+          if (job != null) {
+            // reset the current retry interval
+            retryInterval = jobExecutor.getRetryInterval();
+            // wait for next due job
+            long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
+            if (waitPeriod > 0) {
+              synchronized (jobExecutor) {
+                jobExecutor.wait(waitPeriod);
+              }
+            }
+          }
+          else {
+            // sleep instead of waiting on jobExecutor
+            // to prevent DbMessageService from waking up this thread
+            sleep(retryInterval);
+            // after an exception, double the current retry interval
+            // to avoid continuous failures during anomalous conditions
+            retryInterval *= 2;
+            // enforce maximum idle interval
+            int maxIdleInterval = jobExecutor.getMaxIdleInterval();
+            if (retryInterval > maxIdleInterval || retryInterval < 0) {
+              retryInterval = maxIdleInterval;
+            }
+          }
+        }
+        catch (InterruptedException e) {
+          if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
+        }
+      }
+    }
+    log.info(getName() + " leaves cyberspace");
+  }
+
+  private Job acquireJob() {
+    boolean debug = log.isDebugEnabled();
+    Job job;
+    // acquire job executor's monitor before creating context and allocating resources
+    synchronized (jobExecutor) {
+      JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+      try {
+        // look for available job
+        job = jbpmContext.getJobSession().getFirstAcquirableJob(null);
+        // is there a job?
+        if (job != null) {
+          // lock job
+          job.setLockOwner(getName());
+          job.setLockTime(new Date());
+          // has job failed previously?
+          if (job.getException() != null) {
+            // decrease retry count
+            int retries = job.getRetries() - 1;
+            job.setRetries(retries);
+            if (debug) log.debug(job + " has " + retries + " retries remaining");
+          }
+          if (debug) log.debug("acquired " + job);
+        }
+        else if (debug) log.debug("no acquirable job found");
+      }
+      catch (RuntimeException e) {
+        jbpmContext.setRollbackOnly();
+        job = null;
+        if (debug) log.debug("failed to acquire job", e);
+      }
+      catch (Error e) {
+        jbpmContext.setRollbackOnly();
+        throw e;
+      }
+      finally {
+        try {
+          jbpmContext.close();
+        }
+        catch (RuntimeException e) {
+          job = null;
+          if (debug) log.debug("failed to acquire job", e);
+        }
+      }
+    }
+    return job;
+  }
+
+  private void submitJob(Job job) {
+    JobParcel jobParcel = new JobParcel(jobExecutor, job);
+    jobExecutor.getExecutorService().execute(jobParcel);
+    if (log.isDebugEnabled()) log.debug("submitted " + job);
+  }
+
+  private long getWaitPeriod(int currentIdleInterval) {
+    Date nextDueDate = getNextDueDate();
+    if (nextDueDate != null) {
+      long waitPeriod = nextDueDate.getTime() - System.currentTimeMillis();
+      if (waitPeriod < currentIdleInterval) return waitPeriod;
+    }
+    return currentIdleInterval;
+  }
+
+  private Date getNextDueDate() {
+    Date nextDueDate;
+    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+    try {
+      Job job = jbpmContext.getJobSession().getFirstDueJob(null, null);
+      if (job != null) {
+        nextDueDate = job.getDueDate();
+      }
+      else {
+        nextDueDate = null;
+        if (log.isDebugEnabled()) log.debug("no due job found");
+      }
+    }
+    catch (RuntimeException e) {
+      jbpmContext.setRollbackOnly();
+      nextDueDate = null;
+      if (log.isDebugEnabled()) log.debug("failed to determine next due date", e);
+    }
+    catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    }
+    finally {
+      try {
+        jbpmContext.close();
+      }
+      catch (RuntimeException e) {
+        nextDueDate = null;
+        if (log.isDebugEnabled()) log.debug("failed to determine next due date", e);
+      }
+    }
+    return nextDueDate;
+  }
+
+  /**
+   * Indicates that this thread should stop running. Execution will cease shortly afterwards.
+   */
+  public void deactivate() {
+    if (active) {
+      active = false;
+      interrupt();
+    }
+  }
+}


Property changes on: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -8,7 +8,6 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -18,6 +17,14 @@
 
 import org.jbpm.JbpmConfiguration;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
+import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
+import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
 public class JobExecutor implements Serializable {
 
   private static final long serialVersionUID = 1L;
@@ -36,10 +43,13 @@
   /** @deprecated property has no effect */
   protected int lockBufferTime;
 
-  private ThreadGroup threadGroup;
+  ThreadGroup threadGroup;
+  private ExecutorService executorService;
+
   /** @deprecated call {@link #getThreads()} instead */
   protected Map threads;
-  /** @deprecated call {@link #getThreads()} instead */
+
+  private DispatcherThread dispatcherThread;
   protected LockMonitorThread lockMonitorThread;
 
   protected Map monitoredJobIds = new Hashtable();
@@ -55,8 +65,8 @@
       // create thread group
       threadGroup = new ThreadGroup(name) {
         public void uncaughtException(Thread thread, Throwable throwable) {
-          if (thread instanceof JobExecutorThread) {
-            startThread(thread.getName());
+          if (thread instanceof DispatcherThread) {
+            startDispatcherThread();
           }
           else if (thread instanceof LockMonitorThread) {
             startLockMonitorThread();
@@ -65,11 +75,28 @@
         }
       };
 
-      // start executor threads
-      for (int i = 0; i < nbrOfThreads; i++) {
-        startThread();
-      }
+      // create thread factory
+      ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable task) {
+          Thread thread = new Thread(threadGroup, task, getNextThreadName());
+          if (thread.isDaemon()) thread.setDaemon(false);
+          if (thread.getPriority() != Thread.NORM_PRIORITY)
+            thread.setPriority(Thread.NORM_PRIORITY);
+          return thread;
+        }
+      };
 
+      // start executor service
+      executorService = new ThreadPoolExecutor(nbrOfThreads,
+        nbrOfThreads,
+        0L,
+        TimeUnit.MILLISECONDS,
+        new SynchronousQueue(),
+        threadFactory,
+        JobRejectionHandler.INSTANCE);
+
+      // start helper threads
+      startDispatcherThread();
       startLockMonitorThread();
       isStarted = true;
     }
@@ -78,10 +105,24 @@
     }
   }
 
+  private static class JobRejectionHandler implements RejectedExecutionHandler {
+
+    static final JobRejectionHandler INSTANCE = new JobRejectionHandler();
+
+    public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
+      try {
+        executor.getQueue().put(task);
+      }
+      catch (InterruptedException e) {
+        throw new RejectedExecutionException("queuing " + task + " got interrupted", e);
+      }
+    }
+  }
+
   /**
-   * signals to all threads in this job executor to stop. Threads may be in the middle of
-   * processing a job and they will finish that first. Use {@link #stopAndJoin()} in case you
-   * want a method that blocks until all the threads are actually finished.
+   * tells all threads in this job executor to stop. Threads may be in the middle of processing
+   * a job and they will finish that first. Use {@link #stopAndJoin()} in case you want a method
+   * that blocks until all the threads are actually finished.
    * 
    * @return a list of the stopped threads. In case no threads were stopped an empty list will
    * be returned.
@@ -92,57 +133,71 @@
       return Collections.EMPTY_LIST;
     }
 
-    log.info("stopping " + name);
-    isStarted = false;
-
+    // list active threads
     Thread[] activeThreads = new Thread[threadGroup.activeCount()];
-    int threadCount = threadGroup.enumerate(activeThreads);
-    for (int i = 0; i < threadCount; i++) {
-      stopThread(activeThreads[i]);
-    }
+    threadGroup.enumerate(activeThreads);
 
+    // tell threads to stop
+    stopThreads();
+
+    // return formerly active threads
     return Arrays.asList(activeThreads);
   }
 
   public void stopAndJoin() throws InterruptedException {
-    for (Iterator i = stop().iterator(); i.hasNext();) {
-      Thread thread = (Thread) i.next();
-      thread.join();
+    if (!isStarted) {
+      if (log.isDebugEnabled()) log.debug("ignoring stop, " + name + " not started");
+      return;
     }
+
+    // tell threads to stop
+    stopThreads();
+
+    // wait for executor to terminate
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+    // join helper threads
+    dispatcherThread.join();
+    lockMonitorThread.join();
   }
 
-  public void ensureThreadsAreActive() {
-    int activeCount = threadGroup.activeCount();
-    if (activeCount < nbrOfThreads + 1) {
-      // find out who is missing
-      Thread[] activeThreads = new Thread[activeCount];
-      activeCount = threadGroup.enumerate(activeThreads);
+  private void stopThreads() {
+    log.info("stopping " + name);
+    isStarted = false;
 
-      for (int i = 1; i <= nbrOfThreads; i++) {
-        String threadName = getThreadName(i);
-        if (!contains(activeThreads, activeCount, threadName)) {
-          // thread-i is missing, restart it
-          startThread(threadName);
-        }
-      }
-    }
+    // shut down execution service
+    executorService.shutdown();
+
+    // deactivate helper threads
+    dispatcherThread.deactivate();
+    lockMonitorThread.deactivate();
   }
 
-  private static boolean contains(Thread[] threads, int count, String threadName) {
-    for (int i = 0; i < count; i++) {
-      if (threadName.equals(threads[i].getName())) return true;
+  public void ensureThreadsAreActive() {
+    // executor service looks after its own threads
+    // just confirm dispatcher and lock monitor are alive
+    if (!dispatcherThread.isAlive()) {
+      startDispatcherThread();
     }
-    return false;
+    if (!lockMonitorThread.isAlive()) {
+      startLockMonitorThread();
+    }
   }
 
   ThreadGroup getThreadGroup() {
     return threadGroup;
   }
 
+  ExecutorService getExecutorService() {
+    return executorService;
+  }
+
+  /** @deprecated no longer invoked */
   protected void startThread() {
     startThread(getNextThreadName());
   }
 
+  /** @deprecated no longer invoked */
   protected void startThread(String threadName) {
     Thread thread = createThread(threadName);
 
@@ -150,6 +205,7 @@
     thread.start();
   }
 
+  /** @deprecated no longer invoked */
   protected Thread createThread(String threadName) {
     return new JobExecutorThread(threadName, this);
   }
@@ -158,11 +214,12 @@
     return getThreadName(threadGroup.activeCount() + 1);
   }
 
+  /** @deprecated no longer invoked */
   protected String getLastThreadName() {
     return getThreadName(threadGroup.activeCount());
   }
 
-  /** @deprecated */
+  /** @deprecated no longer invoked */
   protected synchronized Thread stopThread() {
     String threadName = getLastThreadName();
     if (log.isDebugEnabled()) log.debug("stopping " + threadName);
@@ -187,18 +244,22 @@
     return name + '@' + getHostAddress() + ":Executor-" + index;
   }
 
+  void startDispatcherThread() {
+    String threadName = name + '@' + getHostAddress() + ':' + DispatcherThread.DEFAULT_NAME;
+    dispatcherThread = new DispatcherThread(threadName, this);
+
+    if (log.isDebugEnabled()) log.debug("starting " + threadName);
+    dispatcherThread.start();
+  }
+
   void startLockMonitorThread() {
-    String threadName = getLockMonitorThreadName();
-    Thread lockMonitorThread = new LockMonitorThread(threadName, this);
+    String threadName = name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
+    lockMonitorThread = new LockMonitorThread(threadName, this);
 
     if (log.isDebugEnabled()) log.debug("starting " + threadName);
     lockMonitorThread.start();
   }
 
-  private String getLockMonitorThreadName() {
-    return name + '@' + getHostAddress() + ':' + LockMonitorThread.DEFAULT_NAME;
-  }
-
   private static String getHostAddress() {
     try {
       return InetAddress.getLocalHost().getHostAddress();

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -175,11 +175,9 @@
         job = jobSession.getFirstAcquirableJob(lockOwner);
         // is there a job?
         if (job != null) {
-          // acquire jobs
-          Date lockTime = new Date();
           // lock job
           job.setLockOwner(lockOwner);
-          job.setLockTime(lockTime);
+          job.setLockTime(new Date());
           // has job failed previously?
           if (job.getException() != null) {
             // decrease retry count

Added: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.job.executor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.JbpmContext;
+import org.jbpm.db.JobSession;
+import org.jbpm.job.Job;
+import org.jbpm.persistence.db.DbPersistenceService;
+import org.jbpm.persistence.db.StaleObjectLogConfigurer;
+
+/**
+ * @author Alejandro Guizar
+ */
+class JobParcel implements Runnable {
+
+  private static final Log log = LogFactory.getLog(JobParcel.class);
+
+  private final JobExecutor jobExecutor;
+  private final Job job;
+
+  JobParcel(JobExecutor jobExecutor, Job job) {
+    this.jobExecutor = jobExecutor;
+    this.job = job;
+  }
+
+  public void run() {
+    try {
+      executeJob();
+    }
+    catch (Exception e) {
+      // on exception, call returns normally
+      saveJobException(e);
+    }
+  }
+
+  private void executeJob() throws Exception {
+    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+    try {
+      if (job.isExclusive()) {
+        jbpmContext.getGraphSession().lockProcessInstance(job.getProcessInstance());
+      }
+
+      JobSession jobSession = jbpmContext.getJobSession();
+      jobSession.reattachJob(job);
+
+      // register process instance for automatic save
+      // https://jira.jboss.org/browse/JBPM-1015
+      jbpmContext.addAutoSaveProcessInstance(job.getProcessInstance());
+
+      if (log.isDebugEnabled()) log.debug("executing " + job);
+      if (job.execute(jbpmContext)) jobSession.deleteJob(job);
+    }
+    catch (Exception e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    }
+    catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    }
+    finally {
+      jbpmContext.close();
+    }
+  }
+
+  private void saveJobException(Exception exception) {
+    // if this is a locking exception, keep it quiet
+    if (DbPersistenceService.isLockingException(exception)) {
+      StaleObjectLogConfigurer.getStaleObjectExceptionsLog()
+        .error("failed to execute " + job, exception);
+    }
+    else {
+      log.error("failed to execute " + job, exception);
+    }
+
+    boolean debug = log.isDebugEnabled();
+    JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
+    try {
+      // do not reattach existing job as it contains undesired updates
+      JobSession jobSession = jbpmContext.getJobSession();
+      Job job = jobSession.loadJob(this.job.getId());
+
+      // print and save exception
+      StringWriter out = new StringWriter();
+      exception.printStackTrace(new PrintWriter(out));
+      job.setException(out.toString());
+
+      // unlock job so it can be dispatched again
+      job.setLockOwner(null);
+      // notify job executor
+      synchronized (jobExecutor) {
+        jobExecutor.notify();
+      }
+    }
+    catch (RuntimeException e) {
+      jbpmContext.setRollbackOnly();
+      if (debug) log.debug("failed to save job exception", e);
+    }
+    catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    }
+    finally {
+      try {
+        jbpmContext.close();
+      }
+      catch (RuntimeException e) {
+        if (debug) log.debug("failed to save job exception", e);
+      }
+    }
+  }
+
+  public String toString() {
+    return "JobParcel(" + job + ')';
+  }
+}


Property changes on: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/job/executor/JobParcel.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/CustomLoaderObjectInputStream.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -34,7 +34,7 @@
  * manner.
  * 
  * @author Alejandro Guizar
- * @deprecated no use for this class
+ * @deprecated not in use anymore
  */
 public class CustomLoaderObjectInputStream extends ObjectInputStream {
 
@@ -73,7 +73,8 @@
    * security-sensitive methods; note that this class does <em>not</em>
    * override said methods
    */
-  public CustomLoaderObjectInputStream(InputStream in, ClassLoader customLoader) throws IOException {
+  public CustomLoaderObjectInputStream(InputStream in, ClassLoader customLoader)
+    throws IOException {
     super(in);
     if (customLoader == null) {
       throw new IllegalArgumentException("custom class loader is null");
@@ -88,7 +89,8 @@
     return customLoader;
   }
 
-  protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+  protected Class resolveClass(ObjectStreamClass desc) throws IOException,
+    ClassNotFoundException {
     try {
       return super.resolveClass(desc);
     }
@@ -97,7 +99,8 @@
     }
   }
 
-  protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+  protected Class resolveProxyClass(String[] interfaces) throws IOException,
+    ClassNotFoundException {
     try {
       return super.resolveProxyClass(interfaces);
     }
@@ -119,11 +122,11 @@
       }
       try {
         return Proxy.getProxyClass(nonPublicLoader != null ? nonPublicLoader : customLoader,
-            classes);
+          classes);
       }
       catch (IllegalArgumentException iae) {
         throw new ClassNotFoundException("could not get proxy class for interfaces: "
-            + ArrayUtil.toString(classes), e);
+          + ArrayUtil.toString(classes), e);
       }
     }
   }

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/EqualsUtil.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -23,6 +23,7 @@
 
 import org.hibernate.proxy.HibernateProxy;
 
+/** @deprecated not in use anymore */
 public class EqualsUtil {
 
   private EqualsUtil() {
@@ -32,12 +33,14 @@
   /**
    * hack to support comparing hibernate proxies against the real objects.
    * since it falls back to ==, clients don't need to override hashcode.
+   *
    * @deprecated hack does not work
    * @see <a href="https://jira.jboss.org/jira/browse/JBPM-2489">JBPM-2489</a>
    */
   public static boolean equals(Object thisObject, Object otherObject) {
-    return thisObject == otherObject || otherObject instanceof HibernateProxy
-        && otherObject.equals(thisObject);
+    return thisObject == otherObject
+      || otherObject instanceof HibernateProxy
+      && otherObject.equals(thisObject);
   }
 
 }

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/StringUtil.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -3,12 +3,16 @@
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 
+/** @deprecated not in use anymore */
 public class StringUtil implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  static final byte[] HEX_CHAR_TABLE = { (byte) '0', (byte) '1', (byte) '2', (byte) '3', (byte) '4', (byte) '5', (byte) '6', (byte) '7', (byte) '8',
-      (byte) '9', (byte) 'a', (byte) 'b', (byte) 'c', (byte) 'd', (byte) 'e', (byte) 'f' };
+  static final byte[] HEX_CHAR_TABLE = {
+    (byte) '0', (byte) '1', (byte) '2', (byte) '3', (byte) '4', (byte) '5', (byte) '6',
+    (byte) '7', (byte) '8', (byte) '9', (byte) 'a', (byte) 'b', (byte) 'c', (byte) 'd',
+    (byte) 'e', (byte) 'f'
+  };
 
   private StringUtil() {
     // hide default constructor to prevent instantiation
@@ -19,27 +23,27 @@
       byte[] hex = new byte[2 * bytes.length];
       int index = 0;
 
-      for (int i=0; i<bytes.length; i++) {
+      for (int i = 0; i < bytes.length; i++) {
         byte b = bytes[i];
         int v = b & 0xFF;
         hex[index++] = HEX_CHAR_TABLE[v >>> 4];
         hex[index++] = HEX_CHAR_TABLE[v & 0xF];
       }
       return new String(hex, "US-ASCII");
-    } catch (UnsupportedEncodingException e) {
+    }
+    catch (UnsupportedEncodingException e) {
       // should not happen, US-ASCII is a standard charset
       throw new AssertionError(e);
     }
   }
-  
+
   public static String toHexStringHibernate(byte[] bytes) {
     StringBuffer buf = new StringBuffer();
-    for ( int i=0; i<bytes.length; i++ ) {
-        String hexStr = Integer.toHexString( bytes[i] - Byte.MIN_VALUE );
-        if ( hexStr.length()==1 ) buf.append('0');
-        buf.append(hexStr);
+    for (int i = 0; i < bytes.length; i++) {
+      String hexStr = Integer.toHexString(bytes[i] - Byte.MIN_VALUE);
+      if (hexStr.length() == 1) buf.append('0');
+      buf.append(hexStr);
     }
     return buf.toString();
   }
-  
 }
\ No newline at end of file

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/util/XmlException.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -24,7 +24,9 @@
 import org.jbpm.JbpmException;
 
 public class XmlException extends JbpmException {
+
   private static final long serialVersionUID = 1L;
+
   public XmlException(String message, Throwable cause) {
     super(message, cause);
   }

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2010-11-09 21:05:06 UTC (rev 6809)
@@ -262,6 +262,18 @@
     ]]>
   </query>
 
+  <query name="JobSession.getFirstAcquirableJobExcludingOwned">
+    <![CDATA[
+      select job
+      from org.jbpm.job.Job job
+      where job.lockOwner is null
+      and job.retries > 0
+      and job.dueDate <= :now
+      and job.isSuspended = false
+      order by job.dueDate asc
+    ]]>
+  </query>
+
   <query name="JobSession.findExclusiveJobs">
     <![CDATA[
       select job
@@ -299,6 +311,17 @@
     ]]>
   </query>
 
+  <query name="JobSession.getFirstDueJobExcludingOwned">
+    <![CDATA[
+      select job
+      from org.jbpm.job.Job job
+      where job.lockOwner is null
+      and job.retries > 0
+      and job.isSuspended = false
+      order by job.dueDate asc
+    ]]>
+  </query>
+
   <query name="JobSession.suspendJobs">
     <![CDATA[
       update org.jbpm.job.Job job

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/SerializabilityTest.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -65,7 +65,10 @@
     "org.jbpm.instantiation.FieldInstantiator",
     "org.jbpm.instantiation.ProcessClassLoader",
     "org.jbpm.instantiation.XmlInstantiator",
+    "org.jbpm.job.executor.DispatcherThread",
     "org.jbpm.job.executor.JobExecutorThread",
+    "org.jbpm.job.executor.JobExecutor$JobRejectionHandler",
+    "org.jbpm.job.executor.JobParcel",
     "org.jbpm.job.executor.LockMonitorThread",
     "org.jbpm.jpdl.convert.Converter",
     "org.jbpm.jpdl.el.",

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java	2010-11-09 16:42:36 UTC (rev 6808)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java	2010-11-09 21:05:06 UTC (rev 6809)
@@ -48,14 +48,13 @@
     deployProcessDefinition(processDefinition);
   }
 
-  // check if the process ends correctly if no Error is thrown
+  /** check if the process ends correctly if no Error is thrown */
   public void testTimerWithoutErrorAction() {
     throwError = false;
     runTimerErrorAction();
   }
 
-  // check if the process ends correctly if an Error is thrown in the
-  // ActionHandler
+  /** check if the process ends correctly if an Error is thrown in the ActionHandler */
   public void testTimerWithErrorAction() {
     throwError = true;
     runTimerErrorAction();
@@ -67,9 +66,8 @@
     processInstance.signal();
 
     processJobs();
-
     processInstance = jbpmContext.loadProcessInstance(processInstance.getId());
-    assert processInstance.hasEnded() : processInstance;
+    assert processInstance.hasEnded() : "expected " + processInstance + " to have ended";
   }
 
   public static class TimerExceptionAction implements ActionHandler {



More information about the jbpm-commits mailing list