Author: marco.rietveld
Date: 2011-05-19 11:34:47 -0400 (Thu, 19 May 2011)
New Revision: 6942
Modified:
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
Log:
Experiment with ThreadMXBean in order to see if this helps to run JobExecutor tests.
Modified:
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java
===================================================================
---
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java 2011-05-19
13:03:08 UTC (rev 6941)
+++
jbpm3/branches/jbpm-3.2-soa/core/src/main/java/org/jbpm/db/AbstractDbTestCase.java 2011-05-19
15:34:47 UTC (rev 6942)
@@ -21,7 +21,11 @@
*/
package org.jbpm.db;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,7 +35,6 @@
import org.hibernate.cfg.Environment;
import org.hibernate.criterion.Projections;
import org.hibernate.criterion.Restrictions;
-
import org.jbpm.AbstractJbpmTestCase;
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
@@ -205,9 +208,10 @@
}
}
- protected void startJobExecutor() {
+ protected JobExecutor startJobExecutor() {
jobExecutor = getJbpmConfiguration().getJobExecutor();
jobExecutor.start();
+ return jobExecutor;
}
/**
@@ -260,6 +264,72 @@
}
}
+ private long getUserTimeInMilliseconds( HashSet<Long> threadIdSet ) {
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+ if ( ! bean.isThreadCpuTimeSupported( ) ) {
+ return 0L;
+ }
+
+ long totalThreadTimeInNanoSeconds = 0;
+ Long [] threadId = new Long[0];
+ threadId = threadIdSet.toArray(threadId);
+
+ for ( int i = 0; i < threadId.length; ++i ) {
+ long threadTime = bean.getThreadUserTime( threadId[i] );
+ if ( threadTime != -1 ) {
+ totalThreadTimeInNanoSeconds += threadTime;
+ }
+ }
+
+ return totalThreadTimeInNanoSeconds/1000000;
+ }
+
+ protected void newWaitForJobs(final long timeout, Map threadMap) {
+
+ // Gather thread Id's
+ Iterator<Thread> threadMapIter = threadMap.values().iterator();
+ HashSet<Long> threadIdSet = new HashSet<Long>();
+ while(threadMapIter.hasNext()) {
+ threadIdSet.add(threadMapIter.next().getId());
+ }
+
+ long waitPeriod = 500;
+ for (int currentCount, previousCount = 0; (currentCount = getNbrOfJobsAvailable())
> 0;) {
+
+ long elapsedTime = getUserTimeInMilliseconds(threadIdSet);
+
+ if (elapsedTime > timeout) {
+ fail("test execution exceeded threshold of " + timeout + "
ms");
+ }
+
+ if (currentCount < previousCount) {
+ long elapsedTimePerJob = (elapsedTime) / (previousCount - currentCount);
+ waitPeriod = currentCount * elapsedTimePerJob;
+ if (waitPeriod < 500) waitPeriod = 500;
+ }
+
+ if (waitPeriod > 5000) {
+ waitPeriod = 5000;
+ }
+ else {
+ long remainingTime = timeout - elapsedTime;
+ waitPeriod = (waitPeriod > remainingTime) ? remainingTime : waitPeriod;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("waiting " + waitPeriod + " ms for " + currentCount
+ " jobs");
+ }
+ try {
+ Thread.sleep(waitPeriod);
+ }
+ catch (InterruptedException e) {
+ fail("wait for jobs got interrupted");
+ }
+
+ previousCount = currentCount;
+ }
+ }
+
protected int getNbrOfJobsAvailable() {
if (session != null) {
return getJobCount(session);
@@ -305,7 +375,7 @@
* time has elapsed. The current jBPM context is closed before waiting and a new one is
opened
* after processing the jobs.
*/
- protected void processJobs(long timeout) {
+ protected void oldProcessJobs(long timeout) {
closeJbpmContext();
try {
startJobExecutor();
@@ -317,6 +387,18 @@
}
}
+ protected void processJobs(long timeout) {
+ closeJbpmContext();
+ try {
+ JobExecutor jobExecutor = startJobExecutor();
+ newWaitForJobs(timeout, jobExecutor.getThreads());
+ }
+ finally {
+ stopJobExecutor();
+ createJbpmContext();
+ }
+ }
+
protected void stopJobExecutor() {
if (jobExecutor != null) {
try {
Show replies by date