[jbpm-commits] JBoss JBPM SVN: r2257 - in jbpm3/trunk/modules: enterprise/jar/src/test/java/org/jbpm/msg/jms and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Sep 15 18:20:58 EDT 2008
Author: alex.guizar at jboss.com
Date: 2008-09-15 18:20:58 -0400 (Mon, 15 Sep 2008)
New Revision: 2257
Modified:
jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java
jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java
Log:
[JBPM-1709] replaced monitors with semaphores to make event notifications more reliable
Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java 2008-09-15 20:30:06 UTC (rev 2256)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java 2008-09-15 22:20:58 UTC (rev 2257)
@@ -22,6 +22,10 @@
package org.jbpm;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import javax.transaction.Status;
import javax.transaction.Synchronization;
@@ -36,6 +40,8 @@
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(EventCallback.class);
+ private static Map<String, Semaphore> eventSemaphores = new HashMap<String, Semaphore>();
+
public void processStart() {
registerNotification(Event.EVENTTYPE_PROCESS_START);
}
@@ -75,15 +81,14 @@
private static void registerNotification(final String event) {
Synchronization notification = new Synchronization() {
- public void beforeCompletion() {}
+ public void beforeCompletion() {
+ }
public void afterCompletion(int status) {
if (status == Status.STATUS_COMMITTED) {
- String canonEvent = event.intern();
- log.debug("notifying " + canonEvent + "@" + Integer.toHexString(System.identityHashCode(canonEvent)));
- synchronized (canonEvent) {
- canonEvent.notify();
- }
+ log.debug("sending '" + event + "' notification");
+ Semaphore eventSemaphore = getEventSemaphore(event);
+ eventSemaphore.release();
}
}
@@ -99,20 +104,39 @@
}
public static void waitForEvent(String event, long timeout) {
- String canonEvent = event.intern();
- long startTime = System.currentTimeMillis();
- log.debug("waiting for " + canonEvent + "@" + Integer.toHexString(System.identityHashCode(canonEvent)));
+ log.debug("waiting for " + event);
+ Semaphore eventSemaphore = getEventSemaphore(event);
try {
- synchronized (canonEvent) {
- canonEvent.wait(timeout);
+ if (eventSemaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
+ log.debug("received '" + event + "' notification");
}
+ else {
+ log.warn("event '" + event + "' did not occur within " + timeout + " ms");
+ }
}
catch (InterruptedException e) {
// reassert interruption
Thread.currentThread().interrupt();
}
- if (System.currentTimeMillis() - startTime >= timeout) {
- log.debug("event '" + canonEvent + "' took longer than " + timeout + " ms to occur");
+ }
+
+ private static Semaphore getEventSemaphore(String event) {
+ synchronized (eventSemaphores) {
+ Semaphore semaphore = eventSemaphores.get(event);
+ if (semaphore == null) {
+ semaphore = new Semaphore(0);
+ eventSemaphores.put(event, semaphore);
+ }
+ return semaphore;
}
}
+
+ public static void clear() {
+ for (Map.Entry<String, Semaphore> entry : eventSemaphores.entrySet()) {
+ int permits = entry.getValue().drainPermits();
+ if (permits != 0) {
+ log.warn("event '" + entry.getKey() + "' has " + permits + " outstanding notifications");
+ }
+ }
+ }
}
\ No newline at end of file
Modified: jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java
===================================================================
--- jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java 2008-09-15 20:30:06 UTC (rev 2256)
+++ jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java 2008-09-15 22:20:58 UTC (rev 2257)
@@ -55,7 +55,7 @@
private static LocalCommandServiceHome commandServiceHome;
static final int processExecutionCount = 20;
- static final int maxWaitTime = 30000;
+ static final int maxWaitTime = 10000;
public static Test suite() throws Exception {
return new IntegrationTestSetup(JmsMessageTest.class, "enterprise-test.war");
@@ -77,6 +77,7 @@
protected void tearDown() throws Exception {
log.info("### " + getName() + " done ###");
+ EventCallback.clear();
commandService = null;
}
@@ -168,7 +169,7 @@
+ " <transition name='d' to='d' />"
+ " <transition name='e' to='e' />"
+ " </fork>"
- + " <node name='a' async='true'>"
+ + " <node name='a'>"
+ " <transition to='j' />"
+ " </node>"
+ " <node name='b' async='true'>"
@@ -214,10 +215,10 @@
+ " <transition to='d' />"
+ " </node>"
+ " <node name='d'>"
- + " <transition to='e' />"
+ " <event type='node-leave'>"
+ " <action async='true' expression='#{eventCallback.nodeLeave}' />"
+ " </event>"
+ + " <transition to='e' />"
+ " </node>"
+ " <node name='e' async='true'>"
+ " <transition to='end' />"
@@ -227,6 +228,8 @@
long[] processIds = new long[processExecutionCount];
for (int i = 0; i < processExecutionCount; i++) {
processIds[i] = launchProcess("execution").getId();
+ EventCallback.waitForEvent(Event.EVENTTYPE_NODE_ENTER);
+ EventCallback.waitForEvent(Event.EVENTTYPE_NODE_LEAVE);
}
for (int i = 0; i < processExecutionCount; i++) {
waitForProcessEnd(processIds[i]);
@@ -257,13 +260,13 @@
private void waitForProcessEnd(long processId) {
long startTime = System.currentTimeMillis();
- while (!hasProcessEnded(processId)) {
+ do {
+ EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, 1000);
if (System.currentTimeMillis() - startTime > maxWaitTime) {
log.warn("process " + processId + " took too long");
break;
}
- EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, 500);
- }
+ } while (!hasProcessEnded(processId));
}
private static Log log = LogFactory.getLog(JmsMessageTest.class);
More information about the jbpm-commits
mailing list