[jbpm-commits] JBoss JBPM SVN: r5208 - in jbpm3/branches/jbpm-3.2-soa/modules/core/src: test/java/org/jbpm and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 3 05:14:48 EDT 2009


Author: mputz
Date: 2009-07-03 05:14:48 -0400 (Fri, 03 Jul 2009)
New Revision: 5208

Added:
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
Modified:
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
Log:
Fix for JBPM-2375 - JobExecutorThread is terminated after an Error: LockMonitorThread restarts terminated JobExecutorThreads.

Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2009-07-03 09:03:10 UTC (rev 5207)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/JobExecutor.java	2009-07-03 09:14:48 UTC (rev 5208)
@@ -48,7 +48,8 @@
       for (int i = 0; i < nbrOfThreads; i++) {
         startThread();
       }
-      lockMonitorThread = new LockMonitorThread(jbpmConfiguration, lockMonitorInterval, maxLockTime, lockBufferTime);
+      lockMonitorThread = new LockMonitorThread(getLockMonitorThreadName(), this);
+      lockMonitorThread.start();
       isStarted = true;
     }
     else {
@@ -92,9 +93,30 @@
 
     if (lockMonitorThread != null) lockMonitorThread.join();
   }
+  
+  public void ensureThreadsAreActive() {
+    List deadThreads = new ArrayList(); 
+    for (Iterator i = threads.values().iterator(); i.hasNext();) {
+      Thread thread = (Thread) i.next();
+      if (!thread.isAlive()) {      
+        if(log.isDebugEnabled())
+        {
+    	  log.debug("detected dead thread '" + thread.getName() + "'");
+        }
+        deadThreads.add(thread.getName());
+        i.remove();
+      }
+    }
+    for (int i = 0; i<deadThreads.size(); i++) {
+        startThread((String)deadThreads.get(i));
+    }
+  }
+  
+  protected void startThread() {
+    startThread(getNextThreadName());
+  }
 
-  protected synchronized void startThread() {
-    String threadName = getNextThreadName();
+  protected synchronized void startThread(String threadName) {
     Thread thread = createThread(threadName);
     threads.put(threadName, thread);
 
@@ -117,6 +139,10 @@
   private String getThreadName(int index) {
     return name + ":" + getHostAddress() + ":" + index;
   }
+  
+  private String getLockMonitorThreadName() {
+    return name + ':' + LockMonitorThread.DEFAULT_NAME + '@' + getHostAddress();     
+  }
 
   private static String getHostAddress() {
     if (hostName == null) {

Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java	2009-07-03 09:03:10 UTC (rev 5207)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/job/executor/LockMonitorThread.java	2009-07-03 09:14:48 UTC (rev 5208)
@@ -16,57 +16,69 @@
 
 public class LockMonitorThread extends Thread {
 
-  final JbpmConfiguration jbpmConfiguration;
-  final int lockMonitorInterval;
-  final int maxLockMonitorInterval;
-  final int maxLockTime;
-  final int lockBufferTime;
+  public static final String DEFAULT_NAME = "LockMonitorThread";
 
+  JbpmConfiguration jbpmConfiguration;
+  int lockMonitorInterval;
+  int maxLockTime;
+  int lockBufferTime;
+
   int currentLockMonitorInterval;
   volatile boolean isActive = true;
+  
+  public LockMonitorThread(JobExecutor jobExecutor) {
+    this(DEFAULT_NAME, jobExecutor);
+  }
 
+  public LockMonitorThread(String name, JobExecutor jobExecutor) {
+    super(name);
+    jbpmConfiguration = jobExecutor.getJbpmConfiguration();
+    lockMonitorInterval = jobExecutor.getLockMonitorInterval();
+    maxLockTime = jobExecutor.getMaxLockTime();
+    lockBufferTime = jobExecutor.getLockBufferTime();
+  }  
+
+  /** @deprecated As of jBPM 3.2.6, replaced by {@link #LockMonitorThread(JobExecutor)} */
   public LockMonitorThread(JbpmConfiguration jbpmConfiguration, int lockMonitorInterval,
       int maxLockTime, int lockBufferTime) {
     this.jbpmConfiguration = jbpmConfiguration;
     this.lockMonitorInterval = lockMonitorInterval;
-    this.maxLockMonitorInterval = jbpmConfiguration.getJobExecutor().getMaxIdleInterval();
     this.maxLockTime = maxLockTime;
     this.lockBufferTime = lockBufferTime;
   }
 
   public void run() {
-    currentLockMonitorInterval = lockMonitorInterval;
-    while (isActive) {
-      try {
-        unlockOverdueJobs();
-        if (isActive) {
-          sleep(currentLockMonitorInterval);
-        }
-        // no exception, reset current lock monitor interval
-        currentLockMonitorInterval = lockMonitorInterval;
-      }
-      catch (RuntimeException e) {
-        log.error("exception in lock monitor thread. waiting " +
-            currentLockMonitorInterval +
-            " milliseconds", e);
+    try {
+      while (isActive) {
         try {
-          sleep(currentLockMonitorInterval);
+          unlockOverdueJobs();
+          jbpmConfiguration.getJobExecutor().ensureThreadsAreActive();
+          if (isActive && lockMonitorInterval > 0) {
+            sleep(lockMonitorInterval);
+          }
         }
-        catch (InterruptedException ie) {
-          log.debug("delay after exception got interrupted", ie);
+        catch (InterruptedException e) {
+          log.info("lock monitor thread '" + getName() + "' got interrupted");
         }
-        // after an exception, double the current lock monitor interval to prevent
-        // continuous exception generation when e.g. the db is unreachable
-        currentLockMonitorInterval <<= 1;
-        if (currentLockMonitorInterval > maxLockMonitorInterval || currentLockMonitorInterval < 0) {
-          currentLockMonitorInterval = maxLockMonitorInterval;
+        catch (Exception e) {
+          log.error("exception in lock monitor thread. waiting "
+              + lockMonitorInterval
+              + " milliseconds", e);
+          try {
+            sleep(lockMonitorInterval);
+          }
+          catch (InterruptedException e2) {
+            log.debug("delay after exception got interrupted", e2);
+          }
         }
       }
-      catch (InterruptedException e) {
-        log.info("lock monitor thread '" + getName() + "' got interrupted");
-      }
     }
-    log.info(getName() + " leaves cyberspace");
+    catch (Exception e) {
+      log.error("exception in lock monitor thread", e);
+    }
+    finally {
+      log.info(getName() + " leaves cyberspace");
+    }
   }
 
   protected void unlockOverdueJobs() {
@@ -78,7 +90,10 @@
       overdueJobs = jobSession.findJobsWithOverdueLockTime(threshold);
       for (Iterator i = overdueJobs.iterator(); i.hasNext();) {
         Job job = (Job) i.next();
-        log.debug("unlocking " + job + " owned by thread " + job.getLockOwner());
+    	if(log.isDebugEnabled())
+    	{
+    		log.debug("unlocking " + job + " owned by thread " + job.getLockOwner());
+    	}        
         job.setLockOwner(null);
         job.setLockTime(null);
       }

Added: jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/test/java/org/jbpm/jbpm2375/JBPM2375Test.java	2009-07-03 09:14:48 UTC (rev 5208)
@@ -0,0 +1,121 @@
+package org.jbpm.jbpm2375;
+
+import org.jbpm.JbpmConfiguration;
+import org.jbpm.db.AbstractDbTestCase;
+import org.jbpm.graph.def.ActionHandler;
+import org.jbpm.graph.def.Event;
+import org.jbpm.graph.def.EventCallback;
+import org.jbpm.graph.def.ProcessDefinition;
+import org.jbpm.graph.exe.ExecutionContext;
+import org.jbpm.graph.exe.ProcessInstance;
+import org.jbpm.job.executor.JobExecutor;
+
+/**
+ * Test if the JobExecutorThread recovers from an Error
+ * 
+ * @see <a href="https://jira.jboss.org/jira/browse/JBPM-2357">JBPM-2357</a>
+ * @author mputz at redhat.com
+ * @since 30-Jun-2009
+ */
+public class JBPM2375Test extends AbstractDbTestCase {
+
+  private static final int TEST_TIMEOUT = 10 * 1000;
+
+  private JobExecutor jobExecutor = new JobExecutor();
+  private long processDefinitionId;
+  
+  // a process definition with two timers moving the token forward
+  // the second state has an action associated with the node-enter event,
+  // which can simulate an Error condition by throwing a NoClassDefFoundError
+  private static final String PROCESS_DEFINITION = "<process-definition name='jbpm2375-timer-error-test'>"
+        + "  <event type='process-end'>"
+        + "    <action expression='#{eventCallback.processEnd}' />"
+        + "  </event>"  
+        + "  <start-state name='start'>"
+        + "    <transition to='state1' name='to_state1'/>"
+        + "  </start-state>"
+        + "  <state name='state1'>"
+        + "    <timer name='moveToDefaultEndAfter1second' duedate='1 second' transition='to_state2'/>"
+        + "    <transition to='state2' name='to_state2'/>"
+        + "  </state>"
+        + "  <state name='state2'>"
+        + "    <timer name='moveToEndAfter1second' duedate='1 second' transition='to_end'/>"
+        + "    <event type='node-enter'>"
+        + "      <action name='exceptionTest' class='"
+        + TimerExceptionAction.class.getName()
+        + "'>"
+        + "      </action>"   
+        + "    </event>"
+        + "    <transition to='end' name='to_end'/>"
+        + "  </state>"        
+        + "  <end-state name='end' />"
+        + "</process-definition>";
+  
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(PROCESS_DEFINITION);
+    jbpmContext.deployProcessDefinition(processDefinition);
+    newTransaction();
+    processDefinitionId = processDefinition.getId();
+
+    getJbpmConfiguration().getJobExecutor().setLockMonitorInterval(TEST_TIMEOUT/2);
+    startJobExecutor();
+    
+    
+  }
+
+  protected void tearDown() throws Exception {
+    stopJobExecutor();
+
+    graphSession.deleteProcessDefinition(processDefinitionId);
+
+    EventCallback.clear();
+    super.tearDown();
+  }  
+
+  // check if the process ends correctly if no Error is thrown
+  public void testTimerWithoutErrorAction() {
+    runTimerErrorAction(Boolean.FALSE);
+  }
+
+  // check if the process ends correctly if an Error is thrown in the ActionHandler
+  public void testTimerWithErrorAction() {
+    runTimerErrorAction(Boolean.TRUE);
+  }
+
+  private void runTimerErrorAction(Boolean withError) {
+
+    // kick off process instance
+    ProcessDefinition processDefinition = graphSession.loadProcessDefinition(processDefinitionId);
+    ProcessInstance processInstance = new ProcessInstance(processDefinition);
+    processInstance.getContextInstance().setVariable("eventCallback", new EventCallback());
+    processInstance.getContextInstance().setVariable("throwError", withError);
+    processInstance.signal();
+    jbpmContext.save(processInstance);
+
+    commitAndCloseSession();
+    try {
+      EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, TEST_TIMEOUT);
+
+      waitForJobs(TEST_TIMEOUT);
+                 
+    } finally {
+      beginSessionTransaction();
+    }
+
+  }  
+
+
+  public static class TimerExceptionAction implements ActionHandler {
+
+    private static final long serialVersionUID = 1L;
+
+    public void execute(ExecutionContext executionContext) throws Exception {
+      Boolean throwError = (Boolean)executionContext.getVariable("throwError");
+      if (throwError.booleanValue()) 
+        throw new NoClassDefFoundError("org.jbpm.no.class.Class");
+    }
+  }
+}




More information about the jbpm-commits mailing list