[jbpm-commits] JBoss JBPM SVN: r6942 - jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu May 19 11:34:47 EDT 2011


Author: marco.rietveld
Date: 2011-05-19 11:34:47 -0400 (Thu, 19 May 2011)
New Revision: 6942

Modified:
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
Log:
Experiment with ThreadMXBean in order to see if this helps to run JobExecutor tests.

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java	2011-05-19 13:03:08 UTC (rev 6941)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java	2011-05-19 15:34:47 UTC (rev 6942)
@@ -21,7 +21,11 @@
  */
 package org.jbpm.db;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +35,6 @@
 import org.hibernate.cfg.Environment;
 import org.hibernate.criterion.Projections;
 import org.hibernate.criterion.Restrictions;
-
 import org.jbpm.AbstractJbpmTestCase;
 import org.jbpm.JbpmConfiguration;
 import org.jbpm.JbpmContext;
@@ -205,9 +208,10 @@
     }
   }
 
-  protected void startJobExecutor() {
+  protected JobExecutor startJobExecutor() {
     jobExecutor = getJbpmConfiguration().getJobExecutor();
     jobExecutor.start();
+    return jobExecutor;
   }
 
   /**
@@ -260,6 +264,72 @@
     }
   }
 
+  private long getUserTimeInMilliseconds( HashSet<Long> threadIdSet ) {
+    ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+    if ( ! bean.isThreadCpuTimeSupported( ) ) {
+        return 0L;
+    } 
+
+    long totalThreadTimeInNanoSeconds = 0;
+    Long [] threadId = new Long[0];
+    threadId = threadIdSet.toArray(threadId);
+
+    for ( int i = 0; i < threadId.length; ++i ) {
+      long threadTime = bean.getThreadUserTime( threadId[i] );
+      if ( threadTime != -1 ) {
+          totalThreadTimeInNanoSeconds += threadTime;
+      }
+    }
+    
+    return totalThreadTimeInNanoSeconds/1000000;
+  }
+
+  protected void newWaitForJobs(final long timeout, Map threadMap) {
+
+    // Gather thread Id's
+    Iterator<Thread> threadMapIter = threadMap.values().iterator();
+    HashSet<Long> threadIdSet = new HashSet<Long>();
+    while(threadMapIter.hasNext()) {
+      threadIdSet.add(threadMapIter.next().getId());
+    }
+    
+    long waitPeriod = 500;
+    for (int currentCount, previousCount = 0; (currentCount = getNbrOfJobsAvailable()) > 0;) {
+
+      long elapsedTime = getUserTimeInMilliseconds(threadIdSet);
+      
+      if (elapsedTime > timeout) {
+        fail("test execution exceeded threshold of " + timeout + " ms");
+      }
+
+      if (currentCount < previousCount) {
+        long elapsedTimePerJob = (elapsedTime) / (previousCount - currentCount);
+        waitPeriod = currentCount * elapsedTimePerJob;
+        if (waitPeriod < 500) waitPeriod = 500;
+      }
+
+      if (waitPeriod > 5000) {
+        waitPeriod = 5000;
+      }
+      else {
+        long remainingTime = timeout - elapsedTime;
+        waitPeriod = (waitPeriod > remainingTime) ? remainingTime : waitPeriod;
+      }
+      
+      if (log.isDebugEnabled()) {
+        log.debug("waiting " + waitPeriod + " ms for " + currentCount + " jobs");
+      }
+      try {
+        Thread.sleep(waitPeriod);
+      }
+      catch (InterruptedException e) {
+        fail("wait for jobs got interrupted");
+      }
+
+      previousCount = currentCount;
+    }
+  }
+  
   protected int getNbrOfJobsAvailable() {
     if (session != null) {
       return getJobCount(session);
@@ -305,7 +375,7 @@
    * time has elapsed. The current jBPM context is closed before waiting and a new one is opened
    * after processing the jobs.
    */
-  protected void processJobs(long timeout) {
+  protected void oldProcessJobs(long timeout) {
     closeJbpmContext();
     try {
       startJobExecutor();
@@ -317,6 +387,18 @@
     }
   }
 
+  protected void processJobs(long timeout) {
+    closeJbpmContext();
+    try {
+      JobExecutor jobExecutor = startJobExecutor();
+      newWaitForJobs(timeout, jobExecutor.getThreads());
+    }
+    finally {
+      stopJobExecutor();
+      createJbpmContext();
+    }
+  }
+
   protected void stopJobExecutor() {
     if (jobExecutor != null) {
       try {



More information about the jbpm-commits mailing list