[jbpm-commits] JBoss JBPM SVN: r6945 - jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu May 19 11:46:55 EDT 2011


Author: marco.rietveld
Date: 2011-05-19 11:46:55 -0400 (Thu, 19 May 2011)
New Revision: 6945

Modified:
   jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
Log:
Experiment with ThreadMXBean: second attempt. 

Modified: jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java	2011-05-19 15:43:13 UTC (rev 6944)
+++ jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java	2011-05-19 15:46:55 UTC (rev 6945)
@@ -21,7 +21,11 @@
  */
 package org.jbpm.db;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +35,6 @@
 import org.hibernate.cfg.Environment;
 import org.hibernate.criterion.Projections;
 import org.hibernate.criterion.Restrictions;
-
 import org.jbpm.AbstractJbpmTestCase;
 import org.jbpm.JbpmConfiguration;
 import org.jbpm.JbpmContext;
@@ -205,9 +208,10 @@
     }
   }
 
-  protected void startJobExecutor() {
+  protected JobExecutor startJobExecutor() {
     jobExecutor = getJbpmConfiguration().getJobExecutor();
     jobExecutor.start();
+    return jobExecutor;
   }
 
   /**
@@ -260,6 +264,73 @@
     }
   }
 
+  private long getUserTimeInMilliseconds( HashSet threadIdSet ) {
+    ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+    if ( ! bean.isThreadCpuTimeSupported( ) ) {
+        return 0L;
+    } 
+
+    long totalThreadTimeInNanoSeconds = 0;
+    Object [] threadId = new Long[0];
+    threadId = threadIdSet.toArray(threadId);
+
+    for ( int i = 0; i < threadId.length; ++i ) {
+      long threadTime = bean.getThreadUserTime( (Long) threadId[i] );
+      if ( threadTime != -1 ) {
+          totalThreadTimeInNanoSeconds += threadTime;
+      }
+    }
+    
+    return totalThreadTimeInNanoSeconds/1000000;
+  }
+
+  protected void newWaitForJobs(final long timeout, Map threadMap) {
+
+    // Gather thread Id's
+    Iterator threadMapIter = threadMap.values().iterator();
+    HashSet threadIdSet = new HashSet();
+    while(threadMapIter.hasNext()) {
+      Thread thisThread = (Thread) threadMapIter.next();
+      threadIdSet.add(thisThread.getId());
+    }
+    
+    long waitPeriod = 500;
+    for (int currentCount, previousCount = 0; (currentCount = getNbrOfJobsAvailable()) > 0;) {
+
+      long elapsedTime = getUserTimeInMilliseconds(threadIdSet);
+      
+      if (elapsedTime > timeout) {
+        fail("test execution exceeded threshold of " + timeout + " ms");
+      }
+
+      if (currentCount < previousCount) {
+        long elapsedTimePerJob = (elapsedTime) / (previousCount - currentCount);
+        waitPeriod = currentCount * elapsedTimePerJob;
+        if (waitPeriod < 500) waitPeriod = 500;
+      }
+
+      if (waitPeriod > 5000) {
+        waitPeriod = 5000;
+      }
+      else {
+        long remainingTime = timeout - elapsedTime;
+        waitPeriod = (waitPeriod > remainingTime) ? remainingTime : waitPeriod;
+      }
+      
+      if (log.isDebugEnabled()) {
+        log.debug("waiting " + waitPeriod + " ms for " + currentCount + " jobs");
+      }
+      try {
+        Thread.sleep(waitPeriod);
+      }
+      catch (InterruptedException e) {
+        fail("wait for jobs got interrupted");
+      }
+
+      previousCount = currentCount;
+    }
+  }
+  
   protected int getNbrOfJobsAvailable() {
     if (session != null) {
       return getJobCount(session);
@@ -305,7 +376,7 @@
    * time has elapsed. The current jBPM context is closed before waiting and a new one is opened
    * after processing the jobs.
    */
-  protected void processJobs(long timeout) {
+  protected void oldProcessJobs(long timeout) {
     closeJbpmContext();
     try {
       startJobExecutor();
@@ -317,6 +388,18 @@
     }
   }
 
+  protected void processJobs(long timeout) {
+    closeJbpmContext();
+    try {
+      JobExecutor jobExecutor = startJobExecutor();
+      newWaitForJobs(timeout, jobExecutor.getThreads());
+    }
+    finally {
+      stopJobExecutor();
+      createJbpmContext();
+    }
+  }
+
   protected void stopJobExecutor() {
     if (jobExecutor != null) {
       try {



More information about the jbpm-commits mailing list