[jbpm-commits] JBoss JBPM SVN: r6869 - jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 16 01:45:19 EST 2010


Author: alex.guizar at jboss.com
Date: 2010-12-16 01:45:19 -0500 (Thu, 16 Dec 2010)
New Revision: 6869

Modified:
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java
Log:
JBPM-2959 port forward job executor improvements devised while backporting dispatcher thread:
replace ExecutorService.submit() call with .execute(), the latter does not return a Future;
replace ArrayBlockingQueue with SynchronousQueue to avoid dispatching jobs which no executor is demanding

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java	2010-12-15 11:03:13 UTC (rev 6868)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/DispatcherThread.java	2010-12-16 06:45:19 UTC (rev 6869)
@@ -118,8 +118,8 @@
 
   protected void submitAcquiredJobDbids(Collection<Long> jobDbids) {
     log.debug("submitting jobs "+jobDbids);
-    jobExecutor.getThreadPool().submit(
-        new JobParcel(jobExecutor.getCommandExecutor(), jobDbids));
+    jobExecutor.getExecutorService().execute(new JobParcel(jobExecutor.getCommandExecutor(),
+      jobDbids));
     log.trace("jobs "+jobDbids+" were submitted");
   }
 

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java	2010-12-15 11:03:13 UTC (rev 6868)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExecutor.java	2010-12-16 06:45:19 UTC (rev 6869)
@@ -28,7 +28,7 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
@@ -67,7 +67,7 @@
   
   private boolean isActive;
 
-  private ExecutorService threadPool;
+  private ExecutorService executorService;
   private DispatcherThread dispatcherThread;
   
   private List<JobHistoryEntry> history = new ArrayList<JobHistoryEntry>();
@@ -83,11 +83,11 @@
       
       isActive = true;
       log.trace("starting thread pool for job executor '"+name+"'...");
-      threadPool = new ThreadPoolExecutor(nbrOfThreads, 
+      executorService = new ThreadPoolExecutor(nbrOfThreads, 
                                           nbrOfThreads, 
                                           0L, 
                                           TimeUnit.MILLISECONDS,
-                                          new ArrayBlockingQueue<Runnable>(nbrOfThreads), 
+                                          new SynchronousQueue<Runnable>(), 
                                           JobRejectionHandler.INSTANCE);
 
       log.trace("starting dispatcher thread for job executor '"+name+"'...");
@@ -114,7 +114,7 @@
 
   /** stops with join set to false.
    * @see #stop(boolean) */ 
-  public synchronized void stop() {
+  public void stop() {
     stop(false);
   }
   
@@ -126,14 +126,15 @@
    * are in the middle of executing a job and they may finish after this method returned.
    */
   public synchronized void stop(boolean join) {
-    log.debug("stopping job executor");
     if (isActive) {
       isActive = false;
+
+      log.debug("stopping job executor");
       dispatcherThread.deactivate(join);
-      threadPool.shutdown();
+      executorService.shutdown();
       if (join) {
         try {
-          threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+          executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
           log.trace("joining "+getName()+" got interrupted");
         }
@@ -151,7 +152,7 @@
     }
   }
 
-  static String getHostName() {
+  private static String getHostName() {
     try {
       return InetAddress.getLocalHost().getHostAddress();
     } catch (UnknownHostException e) {
@@ -159,8 +160,8 @@
     }
   }
 
-  protected ExecutorService getThreadPool() {
-    return threadPool;
+  protected ExecutorService getExecutorService() {
+    return executorService;
   }
 
   // getters //////////////////////////////////////////////////////////////////



More information about the jbpm-commits mailing list