[jbpm-commits] JBoss JBPM SVN: r2257 - in jbpm3/trunk/modules: enterprise/jar/src/test/java/org/jbpm/msg/jms and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 15 18:20:58 EDT 2008


Author: alex.guizar at jboss.com
Date: 2008-09-15 18:20:58 -0400 (Mon, 15 Sep 2008)
New Revision: 2257

Modified:
   jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java
   jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java
Log:
[JBPM-1709] replaced monitors with semaphores to make event notifications more reliable

Modified: jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java
===================================================================
--- jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java	2008-09-15 20:30:06 UTC (rev 2256)
+++ jbpm3/trunk/modules/core/src/main/java/org/jbpm/EventCallback.java	2008-09-15 22:20:58 UTC (rev 2257)
@@ -22,6 +22,10 @@
 package org.jbpm;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
@@ -36,6 +40,8 @@
   private static final long serialVersionUID = 1L;
   private static final Log log = LogFactory.getLog(EventCallback.class);
 
+  private static Map<String, Semaphore> eventSemaphores = new HashMap<String, Semaphore>();
+
   public void processStart() {
     registerNotification(Event.EVENTTYPE_PROCESS_START);
   }
@@ -75,15 +81,14 @@
   private static void registerNotification(final String event) {
     Synchronization notification = new Synchronization() {
 
-      public void beforeCompletion() {}
+      public void beforeCompletion() {
+      }
 
       public void afterCompletion(int status) {
         if (status == Status.STATUS_COMMITTED) {
-          String canonEvent = event.intern();
-          log.debug("notifying " + canonEvent + "@" + Integer.toHexString(System.identityHashCode(canonEvent)));
-          synchronized (canonEvent) {
-            canonEvent.notify();
-          }
+          log.debug("sending '" + event + "' notification");
+          Semaphore eventSemaphore = getEventSemaphore(event);
+          eventSemaphore.release();
         }
       }
 
@@ -99,20 +104,39 @@
   }
 
   public static void waitForEvent(String event, long timeout) {
-    String canonEvent = event.intern();
-    long startTime = System.currentTimeMillis();
-    log.debug("waiting for " + canonEvent + "@" + Integer.toHexString(System.identityHashCode(canonEvent)));
+    log.debug("waiting for " + event);
+    Semaphore eventSemaphore = getEventSemaphore(event);
     try {
-      synchronized (canonEvent) {
-        canonEvent.wait(timeout);
+      if (eventSemaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
+        log.debug("received '" + event + "' notification");
       }
+      else {
+        log.warn("event '" + event + "' did not occur within " + timeout + " ms");
+      }
     }
     catch (InterruptedException e) {
       // reassert interruption
       Thread.currentThread().interrupt();
     }
-    if (System.currentTimeMillis() - startTime >= timeout) {
-      log.debug("event '" + canonEvent + "' took longer than " + timeout + " ms to occur");
+  }
+
+  private static Semaphore getEventSemaphore(String event) {
+    synchronized (eventSemaphores) {
+      Semaphore semaphore = eventSemaphores.get(event);
+      if (semaphore == null) {
+        semaphore = new Semaphore(0);
+        eventSemaphores.put(event, semaphore);
+      }
+      return semaphore;
     }
   }
+
+  public static void clear() {
+    for (Map.Entry<String, Semaphore> entry : eventSemaphores.entrySet()) {
+      int permits = entry.getValue().drainPermits();
+      if (permits != 0) {
+        log.warn("event '" + entry.getKey() + "' has " + permits + " outstanding notifications");
+      }
+    }
+  }
 }
\ No newline at end of file

Modified: jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java
===================================================================
--- jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java	2008-09-15 20:30:06 UTC (rev 2256)
+++ jbpm3/trunk/modules/enterprise/jar/src/test/java/org/jbpm/msg/jms/JmsMessageTest.java	2008-09-15 22:20:58 UTC (rev 2257)
@@ -55,7 +55,7 @@
   private static LocalCommandServiceHome commandServiceHome;
 
   static final int processExecutionCount = 20;
-  static final int maxWaitTime = 30000;
+  static final int maxWaitTime = 10000;
 
   public static Test suite() throws Exception {
      return new IntegrationTestSetup(JmsMessageTest.class, "enterprise-test.war");
@@ -77,6 +77,7 @@
 
   protected void tearDown() throws Exception {
     log.info("### " + getName() + " done ###");
+    EventCallback.clear();
     commandService = null;
   }
 
@@ -168,7 +169,7 @@
         + "    <transition name='d' to='d' />"
         + "    <transition name='e' to='e' />"
         + "  </fork>"
-        + "  <node name='a' async='true'>"
+        + "  <node name='a'>"
         + "    <transition to='j' />"
         + "  </node>"
         + "  <node name='b' async='true'>"
@@ -214,10 +215,10 @@
         + "    <transition to='d' />"
         + "  </node>"
         + "  <node name='d'>"
-        + "    <transition to='e' />"
         + "    <event type='node-leave'>"
         + "      <action async='true' expression='#{eventCallback.nodeLeave}' />"
         + "    </event>"
+        + "    <transition to='e' />"
         + "  </node>"
         + "  <node name='e' async='true'>"
         + "    <transition to='end' />"
@@ -227,6 +228,8 @@
     long[] processIds = new long[processExecutionCount];
     for (int i = 0; i < processExecutionCount; i++) {
       processIds[i] = launchProcess("execution").getId();
+      EventCallback.waitForEvent(Event.EVENTTYPE_NODE_ENTER);
+      EventCallback.waitForEvent(Event.EVENTTYPE_NODE_LEAVE);
     }
     for (int i = 0; i < processExecutionCount; i++) {
       waitForProcessEnd(processIds[i]);
@@ -257,13 +260,13 @@
 
   private void waitForProcessEnd(long processId) {
     long startTime = System.currentTimeMillis();
-    while (!hasProcessEnded(processId)) {
+    do {
+      EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, 1000);
       if (System.currentTimeMillis() - startTime > maxWaitTime) {
         log.warn("process " + processId + " took too long");
         break;
       }
-      EventCallback.waitForEvent(Event.EVENTTYPE_PROCESS_END, 500);
-    }
+    } while (!hasProcessEnded(processId));
   }
 
   private static Log log = LogFactory.getLog(JmsMessageTest.class);




More information about the jbpm-commits mailing list