[jbpm-commits] JBoss JBPM SVN: r5365 - in jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal: jobexecutor and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 28 22:01:11 EDT 2009


Author: alex.guizar at jboss.com
Date: 2009-07-28 22:01:11 -0400 (Tue, 28 Jul 2009)
New Revision: 5365

Added:
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java
Removed:
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java
Modified:
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java
Log:
[JBPM-2402] verify job executor threads in case of errors 
replace our custom thread pool with Executors.newFixedThreadPool, which recreates terminated threads

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java	2009-07-29 01:30:33 UTC (rev 5364)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/ExecuteJobCmd.java	2009-07-29 02:01:11 UTC (rev 5365)
@@ -85,7 +85,7 @@
 		        throw new JbpmException("job took too long: lock expired "+(currentTime-lockExpiration)+"ms ago");
 		      }
 	      }
-	    } catch (Throwable exception) {
+	    } catch (Exception exception) {
 	      log.error("exception while executing '"+job+"'", exception);
 	      handleJobExecutionException(environment, job, exception);
 	    } finally {
@@ -102,7 +102,7 @@
    * Transaction.EVENT_AFTERCOMPLETION (after the job locks of the current transaction are 
    * released).  Then the command will update the job with the exception details in a separate 
    * transaction. */
-  protected void handleJobExecutionException(Environment environment, JobImpl<?> job, Throwable exception) {
+  protected void handleJobExecutionException(Environment environment, JobImpl<?> job, Exception exception) {
     Transaction transaction = environment.get(Transaction.class);
     // transaction.setRollbackOnly();
 

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java	2009-07-29 01:30:33 UTC (rev 5364)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java	2009-07-29 02:01:11 UTC (rev 5365)
@@ -40,7 +40,7 @@
   private static final Log log = Log.getLog(DispatcherThread.class.getName());
   
   protected JobExecutor jobExecutor;
-  protected   boolean isActive = true;
+  protected volatile boolean isActive = true;
   protected boolean checkForNewJobs;
   protected int currentIdleInterval;
   protected Object semaphore = new Object();
@@ -55,7 +55,7 @@
   }
 
   public void run() {
-    log.info("starting...");
+    log.info("starting " + getName());
     currentIdleInterval = jobExecutor.getIdleMillis();
     try {
       while (isActive) {
@@ -69,7 +69,7 @@
           // no exception so resetting the currentIdleInterval
           currentIdleInterval = jobExecutor.getIdleMillis();
           if ((acquiredJobDbids != null) && (!acquiredJobDbids.isEmpty())) {
-            putAcquiredJobDbidsOnQueue(acquiredJobDbids);
+            submitAcquiredJobDbids(acquiredJobDbids);
             log.debug("added jobs "+acquiredJobDbids+" to the queue");
 
           } else if (isActive) {
@@ -88,7 +88,7 @@
           }
 
         } catch (InterruptedException e) {
-          log.info((isActive ? "active" : "inactivated") + " job dispatcher thread '" + getName() + "' got interrupted");
+          log.info((isActive ? "active" : "inactive") + " job dispatcher thread '" + getName() + "' got interrupted");
         } catch (Exception e) {
           log.error("exception in job executor thread. waiting " + currentIdleInterval + " milliseconds", e);
           try {
@@ -103,24 +103,16 @@
           currentIdleInterval = currentIdleInterval * 2;
         }
       }
-    } catch (Throwable t) {
-      t.printStackTrace();
     } finally {
       log.info(getName() + " leaves cyberspace");
     }
   }
 
-  protected void putAcquiredJobDbidsOnQueue(Collection<Long> acquiredJobDbids) {
-    log.debug("pushing jobs on the queue "+acquiredJobDbids);
-    while (acquiredJobDbids!=null) {
-      try {
-        jobExecutor.getJobDbidsQueue().put(acquiredJobDbids);
-        log.trace("jobs "+acquiredJobDbids+" were put on the queue");
-        acquiredJobDbids = null;
-      } catch (InterruptedException e) {
-        log.trace("putting acquired job dbids got interrupted. retrying...");
-      }
-    }
+  protected void submitAcquiredJobDbids(Collection<Long> jobDbids) {
+    log.debug("submitting jobs "+jobDbids);
+    jobExecutor.getThreadPool().submit(
+        new JobParcel(jobExecutor.getCommandExecutor(), jobDbids));
+    log.trace("jobs "+jobDbids+" were submitted");
   }
 
   protected Collection<Long> acquireJobs() {
@@ -161,13 +153,11 @@
       isActive = false;
       interrupt();
       if (join) {
-        while (isAlive()) {
-          try {
-            log.debug("joining "+getName());
-            join();
-          } catch (InterruptedException e) {
-            log.trace("joining "+getName()+" got interrupted");
-          }
+        try {
+          log.debug("joining "+getName());
+          join();
+        } catch (InterruptedException e) {
+          log.trace("joining "+getName()+" got interrupted");
         }
       }
     } else {

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java	2009-07-29 01:30:33 UTC (rev 5364)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java	2009-07-29 02:01:11 UTC (rev 5365)
@@ -23,12 +23,14 @@
 
 import java.io.Serializable;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.jbpm.api.JbpmException;
 import org.jbpm.internal.log.Log;
@@ -41,6 +43,7 @@
  * @author Tom Baeyens, Guillaume Porcher
  */
 public class JobExecutor implements Serializable {
+  
   private static final Log log = Log.getLog(JobExecutor.class.getName());
 
   private static final long serialVersionUID = 1L;
@@ -60,16 +63,11 @@
   Command<Collection<Long>> acquireJobsCommand;
   Command<Date> nextDueDateCommand;
   
-  boolean isActive = false;
+  boolean isActive;
 
-  JobExecutorThreadPool jobExecutorThreadPool;
+  ExecutorService threadPool;
+  DispatcherThread dispatcherThread;
   
-  DispatcherThread dispatcherThread = null;
-  
-  /** queue to dispatch collections of jobDbids to the JobExecutorThreads, which are 
-   * competing readers. */
-  BlockingQueue<Collection<Long>> jobDbidsQueue = null;
-
   List<JobHistoryEntry> history = new ArrayList<JobHistoryEntry>();
   
   /** starts the {@link DispatcherThread} and {@link JobExecutorThread}s for this job executor */
@@ -81,14 +79,9 @@
       acquireJobsCommand = new AcquireJobsCmd(this);
       nextDueDateCommand = new GetNextDueDateCmd(this);
       
-      // the max capacity of the jobDbidsQueue is set to nbrOfJobExecutorThreads.  
-      // That way, the distpatcher thread will be stalled if enough jobs are acquired.
-      jobDbidsQueue = new ArrayBlockingQueue<Collection<Long>>(nbrOfThreads, true);
-
       isActive = true;
-      log.trace("starting job executor threads for job executor '"+name+"'...");
-      jobExecutorThreadPool = new JobExecutorThreadPool(this);
-      jobExecutorThreadPool.start();
+      log.trace("starting thread pool for job executor '"+name+"'...");
+      threadPool = Executors.newFixedThreadPool(nbrOfThreads);
 
       log.trace("starting dispatcher thread for job executor '"+name+"'...");
       dispatcherThread = new DispatcherThread(this);
@@ -117,24 +110,19 @@
     if (isActive) {
       isActive = false;
       dispatcherThread.deactivate(true);
-      waitTillQueueEmpty();
-      jobExecutorThreadPool.deactivate(join);
+      threadPool.shutdown();
+      if (join) {
+        try {
+          threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          log.trace("joining "+getName()+" got interrupted");
+        }
+      }
     } else {
       log.trace("ignoring stop: job executor '"+name+"' not started");
     }
   }
 
-  protected void waitTillQueueEmpty() {
-    while (! jobDbidsQueue.isEmpty()) {
-      log.trace("waiting for job-id-queue to become empty");
-      try {
-        Thread.sleep(200);
-      } catch (InterruptedException e) {
-        log.trace("waiting for job-id-queue to become empty got interrupted");
-      }
-    }
-  }
-
   public void jobWasAdded() {
     if ( (dispatcherThread!=null)
          && (dispatcherThread.isActive())
@@ -143,16 +131,16 @@
     }
   }
 
-  protected static String getHostName() {
+  static String getHostName() {
     try {
       return InetAddress.getLocalHost().getHostAddress();
-    } catch (Exception e) {
-      return "unknown";
+    } catch (UnknownHostException e) {
+      return "localhost";
     }
   }
 
-  protected BlockingQueue<Collection<Long>> getJobDbidsQueue() {
-    return jobDbidsQueue;
+  protected ExecutorService getThreadPool() {
+    return threadPool;
   }
 
   // getters //////////////////////////////////////////////////////////////////

Deleted: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java	2009-07-29 01:30:33 UTC (rev 5364)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThread.java	2009-07-29 02:01:11 UTC (rev 5365)
@@ -1,101 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.pvm.internal.jobexecutor;
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-
-import org.jbpm.internal.log.Log;
-import org.jbpm.pvm.internal.cmd.CommandService;
-import org.jbpm.pvm.internal.cmd.ExecuteJobCmd;
-
-
-/**
- * @author Tom Baeyens, Guillaume Porcher
- */
-public class JobExecutorThread extends Thread {
-  
-  private static final Log log = Log.getLog(JobExecutorThread.class.getName());
-  
-  JobExecutor jobExecutor; 
-  boolean isActive = true;
-
-  public JobExecutorThread(String name, JobExecutor jobExecutor) {
-    super(name);
-    this.jobExecutor = jobExecutor;
-  }
-
-  public void run() {
-    log.info("starting...");
-    try {
-      BlockingQueue<Collection<Long>> queue = jobExecutor.getJobDbidsQueue();
-      if (queue == null) {
-    	  log.debug("no queue to take jobs from");
-      } else {
-        while (isActive) {
-          try {
-            log.trace("taking jobs from queue");
-            Collection<Long> jobDbids = null;
-            jobDbids = queue.take();
-            log.debug("took job(s) "+jobDbids+" from queue");
-
-            for (Long jobDbid: jobDbids) {
-              CommandService commandService = jobExecutor.getCommandExecutor();
-              commandService.execute(new ExecuteJobCmd(jobDbid));
-            }
-          } catch (InterruptedException e) {
-            log.trace("waiting for acquired jobs got interrupted");
-          } catch (Exception e) {
-            log.error("exception in job executor thread", e);
-          }
-        }
-      }
-    } catch (Throwable t) {
-      t.printStackTrace();
-    } finally {
-      log.info(getName()+" leaves cyberspace");
-    }
-  }
-  
-  public void deactivate() {
-    deactivate(false);
-  }
-  
-  public void deactivate(boolean join) {
-    if (isActive) {
-      log.trace("deactivating "+getName());
-      isActive = false;
-      interrupt();
-      if (join) {
-        while (isAlive()) {
-          try {
-            join();
-          } catch (InterruptedException e) {
-            log.trace("joining "+getName()+" got interrupted");
-          }
-        }
-      }
-    } else {
-      log.trace("ignoring deactivate: "+getName()+" is not active");
-    }
-  }
-}

Deleted: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java	2009-07-29 01:30:33 UTC (rev 5364)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorThreadPool.java	2009-07-29 02:01:11 UTC (rev 5365)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.pvm.internal.jobexecutor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jbpm.internal.log.Log;
-
-/**
- * @author Tom Baeyens
- */
-public class JobExecutorThreadPool {
-  
-  private static final Log log = Log.getLog(JobExecutorThreadPool.class.getName());
-
-  List<JobExecutorThread> jobExecutorThreads = new ArrayList<JobExecutorThread>();
-  JobExecutor jobExecutor;
-
-  public JobExecutorThreadPool(JobExecutor jobExecutor) {
-    this.jobExecutor = jobExecutor;
-  }
-
-  public JobExecutorThread startThread() {
-    String threadName = getNextThreadName();
-    JobExecutorThread jobExecutorThread = new JobExecutorThread(threadName, jobExecutor);
-    jobExecutorThreads.add(jobExecutorThread);
-    log.trace("starting "+threadName);
-    jobExecutorThread.start();
-    return jobExecutorThread;
-  }
-
-  public JobExecutorThread stopThread() {
-    JobExecutorThread thread = null;
-    int lastIndex = jobExecutorThreads.size()-1;
-    if (lastIndex>=0) {
-      thread = (JobExecutorThread) jobExecutorThreads.remove(lastIndex);
-      thread.deactivate();
-    }
-    return thread;
-  }
-
-  public void start() {
-    for (int i=0; i<jobExecutor.getNbrOfThreads(); i++) {
-      startThread();
-    }
-  }
-
-  public void deactivate() {
-    deactivate(false);
-  }
-
-  public void deactivate(boolean join) {
-    for (JobExecutorThread jobExecutorThread : jobExecutorThreads) {
-      jobExecutorThread.deactivate(join);
-    }
-  }
-
-  protected String getNextThreadName() {
-    return "JobExecutorThread" + jobExecutorThreads.size();
-  }
-
-  public List<JobExecutorThread> getJobExecutorThreads() {
-    return jobExecutorThreads;
-  }
-}

Added: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobParcel.java	2009-07-29 02:01:11 UTC (rev 5365)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.Collection;
+
+import org.jbpm.internal.log.Log;
+import org.jbpm.pvm.internal.cmd.CommandService;
+import org.jbpm.pvm.internal.cmd.ExecuteJobCmd;
+
+/**
+ * @author Alejandro Guizar
+ */
+public class JobParcel implements Runnable {
+
+  private static final Log log = Log.getLog(JobParcel.class.getName());
+
+  private final CommandService commandService;
+  private final Collection<Long> jobDbids;
+
+  public JobParcel(CommandService commandService, Collection<Long> jobDbids) {
+    this.commandService = commandService;
+    this.jobDbids = jobDbids;
+  }
+
+  public void run() {
+    try {
+      for (Long jobDbid : jobDbids) {
+        commandService.execute(new ExecuteJobCmd(jobDbid));
+      }
+    } catch (RuntimeException e) {
+      log.error("exception in job block", e);
+    }
+  }
+}



More information about the jbpm-commits mailing list