[jbpm-commits] JBoss JBPM SVN: r6920 - in jbpm3/branches/3.2.10.SP/core/src/main: java/org/jbpm/job/executor and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri May 13 11:09:03 EDT 2011


Author: jcoleman at redhat.com
Date: 2011-05-13 11:09:03 -0400 (Fri, 13 May 2011)
New Revision: 6920

Modified:
   jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java
   jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
   jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
   jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
   jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
Log:
Pull up revisions 6918, 6919 from jbpm-3.2-soa branch:

SOA-3007.
Fix problems with the split of the old JobExecutorThread(s) into the new
DispatcherThread + JobExecutorThread(s):
  o  the DispatcherThread busy-waits in a loop (and uses CPU)
  o  job processing could be deferred for an indeterminate time.
  o  crashing at certain points could leave jobs locked (not processed)
     indefinitely
Fix by kconner at .


Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java	2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/db/JobSession.java	2011-05-13 15:09:03 UTC (rev 6920)
@@ -275,4 +275,14 @@
     }
     return session.createCriteria(Job.class).add(Restrictions.in("id", jobs)).list();
   }
+
+  public void releaseLockedJobs(final String lockOwner) {
+    try {
+      session.getNamedQuery("JobSession.releaseLockedJobs")
+        .setString("lockOwner", lockOwner)
+        .executeUpdate();
+    } catch (HibernateException e) {
+      throw new JbpmPersistenceException("could not release locked jobs by owner '" + lockOwner + "'", e);
+    }
+  }
 }

Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java	2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/DispatcherThread.java	2011-05-13 15:09:03 UTC (rev 6920)
@@ -52,40 +52,28 @@
   }
 
   public void run() {
-    int retryInterval = jobExecutor.getRetryInterval();
     while (active) {
-      // acquire job; on exception, call returns null
-      Job job = acquireJob();
-      // submit job
-      if (job != null) submitJob(job);
+      if (jobExecutor.waitForFreeExecutorThread()) {
+        // acquire job; on exception, call returns null
+        Job job = acquireJob();
+        // submit job
+        if (job != null) {
+          submitJob(job);
+          continue ;
+        }
+      }
 
       // if still active, wait or sleep
       if (active) {
         try {
-          if (job != null) {
-            // reset the current retry interval
-            retryInterval = jobExecutor.getRetryInterval();
-            // wait for next due job
-            long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
-            if (waitPeriod > 0) {
-              synchronized (jobExecutor) {
+          // wait for next due job
+          long waitPeriod = getWaitPeriod(jobExecutor.getIdleInterval());
+          if (waitPeriod > 0) {
+            synchronized (jobExecutor) {
+              if (active)
                 jobExecutor.wait(waitPeriod);
-              }
             }
           }
-          else {
-            // sleep instead of waiting on jobExecutor
-            // to prevent message/scheduler service from waking up this thread
-            sleep(retryInterval);
-            // after an exception, double the current retry interval
-            // to avoid continuous failures during anomalous conditions
-            retryInterval *= 2;
-            // enforce maximum idle interval
-            int maxIdleInterval = jobExecutor.getMaxIdleInterval();
-            if (retryInterval > maxIdleInterval || retryInterval < 0) {
-              retryInterval = maxIdleInterval;
-            }
-          }
         }
         catch (InterruptedException e) {
           if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
@@ -144,12 +132,8 @@
   }
 
   private void submitJob(Job job) {
-    try {
-      jobExecutor.getQueue().put(job);
-    }
-    catch (InterruptedException e) {
+    if (!jobExecutor.submitJob(job)) {
       unlockJob(job);
-      if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
     }
   }
 

Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2011-05-13 15:09:03 UTC (rev 6920)
@@ -9,6 +9,7 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -16,9 +17,13 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.jbpm.JbpmConfiguration;
+import org.jbpm.JbpmContext;
+import org.jbpm.db.JobSession;
+import org.jbpm.job.Job;
 
-import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Condition;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
 
 public class JobExecutor implements Serializable {
 
@@ -39,7 +44,13 @@
   protected int lockBufferTime;
 
   private ThreadGroup threadGroup;
-  private BlockingQueue queue = new SynchronousQueue();
+  private int waitingExecutorCount ;
+  private boolean waitingDispatcher ;
+  private boolean dispatcherActive ;
+  private Lock waitingExecutorLock = new ReentrantLock() ;
+  private Condition waitingExecutorCondition = waitingExecutorLock.newCondition() ;
+  private Condition waitingDispatcherCondition = waitingExecutorLock.newCondition() ;
+  private LinkedList dispatchedJobs = new LinkedList();
 
   /** @deprecated call {@link #getThreads()} instead */
   protected Map threads;
@@ -56,6 +67,8 @@
     if (!isStarted) {
       log.info("starting " + name);
 
+      activateDispatcher() ;
+      
       // create thread group
       threadGroup = new ThreadGroup(name) {
         public void uncaughtException(Thread thread, Throwable throwable) {
@@ -119,6 +132,8 @@
       }
     }
 
+    deactivateDispatcher() ;
+    
     // return deactivated threads
     return deactivatedThreads;
   }
@@ -158,10 +173,6 @@
     return threadGroup;
   }
 
-  BlockingQueue getQueue() {
-    return queue;
-  }
-
   private String getThreadName(int index) {
     return name + '@' + getHostAddress() + ":Executor-" + index;
   }
@@ -418,5 +429,116 @@
     this.nbrOfThreads = nbrOfThreads;
   }
 
+  private boolean hasFreeExecutor() {
+    waitingExecutorLock.lock() ;
+	try {
+      return (waitingExecutorCount > dispatchedJobs.size()) ;
+    } finally {
+      waitingExecutorLock.unlock() ;
+    }
+  }
+  // return false when interrupted
+  boolean waitForFreeExecutorThread() {
+    waitingExecutorLock.lock() ;
+    try {
+      waitingDispatcher = true ;
+      if (dispatcherActive) {
+        if (hasFreeExecutor()) {
+          return true ;
+        } else {
+          waitingDispatcherCondition.await() ;
+          return hasFreeExecutor() ;
+        }
+      }
+    } catch (final InterruptedException ie) {
+    } finally {
+      waitingDispatcher = false ;
+      waitingExecutorLock.unlock() ;
+    }
+    return false ;
+  }
+  
+  // return null when interrupted
+  Job getJob() {
+    waitingExecutorLock.lock() ;
+    try {
+      waitingExecutorCount++ ;
+      if (dispatcherActive) { 
+        if (waitingDispatcher && hasFreeExecutor()) {
+          waitingDispatcherCondition.signal() ;
+        }
+        if (dispatchedJobs.isEmpty()) {
+          waitingExecutorCondition.await() ;
+        }
+        if (dispatchedJobs.size() > 0) {
+          return (Job)dispatchedJobs.remove() ;
+        }
+      }
+    } catch (final InterruptedException ie) {
+    } finally {
+      waitingExecutorCount-- ;
+      waitingExecutorLock.unlock() ;
+    }
+    return null ;
+  }
+  
+  boolean submitJob(final Job job) {
+	waitingExecutorLock.lock() ;
+	try {
+	  if (hasFreeExecutor()) {
+        dispatchedJobs.add(job) ;
+        waitingExecutorCondition.signal() ;
+        return true ;
+      }
+    } finally {
+      waitingExecutorLock.unlock() ;
+    }
+    return false ;
+  }
+
+  private void activateDispatcher() {
+    waitingExecutorLock.lock() ;
+    try {
+      if (!dispatcherActive) {
+        unlockOurJobs() ;
+        dispatcherActive = true ;
+      }
+    } finally {
+      waitingExecutorLock.unlock() ;
+    }
+  }
+  
+  private void unlockOurJobs() {
+    final JbpmContext jbpmContext = getJbpmConfiguration().createJbpmContext();
+    try {
+      final String lockOwner = getName();
+      final JobSession jobSession = jbpmContext.getJobSession();
+      jobSession.releaseLockedJobs(lockOwner);
+    } catch (RuntimeException e) {
+      jbpmContext.setRollbackOnly();
+      if (log.isDebugEnabled()) log.debug("failed to release locked jobs", e);
+    } catch (Error e) {
+      jbpmContext.setRollbackOnly();
+      throw e;
+    } finally {
+      try {
+        jbpmContext.close();
+      }  catch (RuntimeException e) {
+        if (log.isDebugEnabled()) log.debug("failed to release locked jobs", e);
+      }
+    }
+  }
+
+  private void deactivateDispatcher() {
+    waitingExecutorLock.lock() ;
+    try {
+      dispatcherActive = false ;
+      waitingDispatcherCondition.signal() ;
+      waitingExecutorCondition.signalAll() ;
+    } finally {
+      waitingExecutorLock.unlock() ;
+    }
+  }
+  
   private static Log log = LogFactory.getLog(JobExecutor.class);
 }

Modified: jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/java/org/jbpm/job/executor/JobExecutorThread.java	2011-05-13 15:09:03 UTC (rev 6920)
@@ -7,6 +7,7 @@
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -23,6 +24,7 @@
 
   private final JobExecutor jobExecutor;
   private volatile boolean active = true;
+  private Random random = new Random() ;
 
   public JobExecutorThread(String name, JobExecutor jobExecutor) {
     super(jobExecutor.getThreadGroup(), name);
@@ -42,7 +44,7 @@
   public void run() {
     while (active) {
       // take on next job
-      Job job = acquireJob();
+      Job job = jobExecutor.getJob();
       // if an exception occurs, acquireJob() returns null
       if (job != null) {
         try {
@@ -126,16 +128,6 @@
     return jobs;
   }
 
-  private Job acquireJob() {
-    try {
-      return (Job) jobExecutor.getQueue().take();
-    }
-    catch (InterruptedException e) {
-      if (log.isDebugEnabled()) log.debug(getName() + " got interrupted");
-      return null;
-    }
-  }
-
   protected void executeJob(Job job) throws Exception {
     JbpmContext jbpmContext = jobExecutor.getJbpmConfiguration().createJbpmContext();
     try {
@@ -192,10 +184,9 @@
       // unlock job so it can be dispatched again
       job.setLockOwner(null);
       job.setLockTime(null);
-      // notify job executor
-      synchronized (jobExecutor) {
-        jobExecutor.notify();
-      }
+      int waitPeriod = jobExecutor.getRetryInterval() / 2;
+      waitPeriod += random.nextInt(waitPeriod) ;
+      job.setDueDate(new Date(System.currentTimeMillis() + waitPeriod)) ;
     }
     catch (RuntimeException e) {
       jbpmContext.setRollbackOnly();
@@ -213,6 +204,10 @@
         log.warn("failed to save exception for " + job, e);
       }
     }
+    // notify job executor
+    synchronized (jobExecutor) {
+      jobExecutor.notify();
+    }
   }
 
   private void unlockJob(Job job) {
@@ -224,10 +219,6 @@
       // unlock job
       job.setLockOwner(null);
       job.setLockTime(null);
-      // notify job executor
-      synchronized (jobExecutor) {
-        jobExecutor.notify();
-      }
     }
     catch (RuntimeException e) {
       jbpmContext.setRollbackOnly();
@@ -246,6 +237,10 @@
         log.warn("failed to unlock " + job, e);
       }
     }
+    // notify job executor
+    synchronized (jobExecutor) {
+      jobExecutor.notify();
+    }
   }
 
   /** @deprecated responsibility moved to DispatcherThread */

Modified: jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml
===================================================================
--- jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2011-05-13 15:04:52 UTC (rev 6919)
+++ jbpm3/branches/3.2.10.SP/core/src/main/resources/org/jbpm/db/hibernate.queries.hbm.xml	2011-05-13 15:09:03 UTC (rev 6920)
@@ -401,6 +401,16 @@
     ]]>
   </query>
 
+  <query name="JobSession.releaseLockedJobs">
+    <![CDATA[
+      update org.jbpm.job.Job job
+      set job.lockOwner = null, job.lockTime = null
+      where (job.lockOwner = :lockOwner)
+      and job.retries > 0
+      and job.isSuspended = false
+    ]]>
+  </query>
+
   <!-- related to Tasks            -->
   <!-- ########################### -->
 



More information about the jbpm-commits mailing list