[jbpm-commits] JBoss JBPM SVN: r1890 - in jbossbpm/spec/trunk/modules: ri/src/main/java/org/jboss/bpm/client/internal and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 14 10:40:00 EDT 2008


Author: thomas.diesler at jboss.com
Date: 2008-08-14 10:39:59 -0400 (Thu, 14 Aug 2008)
New Revision: 1890

Modified:
   jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java
   jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java
   jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java
Log:
Fix synchronization issues

Modified: jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java
===================================================================
--- jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -41,7 +41,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.jboss.bpm.EngineShutdownException;
 import org.jboss.bpm.model.Process;
-import org.jboss.bpm.model.Process.ProcessStatus;
 import org.jboss.bpm.runtime.Attachments;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -159,6 +158,7 @@
    */
   public void destroyProcess(Process proc)
   {
+    log.debug("destroyProcess: " + proc);
     procs.remove(proc.getID());
   }
 
@@ -191,14 +191,6 @@
   }
 
   /**
-   * Wait for the Process to end. All Tokens that are generated at the Start Event for that Process must eventually
-   * arrive at an End Event. The Process will be in a running state until all Tokens are consumed. <p/> If the process
-   * was aborted this method throws the causing RuntimeException if avaialable.
-   * 
-   */
-  public abstract ProcessStatus waitForEnd(ObjectName procID, long timeout);
-
-  /**
    * Get the handler for the dialect with the given namespace URI
    */
   public DialectHandler getDialectHandler(String nsURI)

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -136,20 +136,23 @@
       boolean hasActiveTokens = tokenExecutor.hasActiveTokens();
       try
       {
-        while (procStatus == ProcessStatus.Active && hasActiveTokens)
+        synchronized (procImpl)
         {
-          try
+          while (procStatus == ProcessStatus.Active && hasActiveTokens)
           {
-            Thread.sleep(100);
+            try
+            {
+              procImpl.wait();
+            }
+            catch (InterruptedException ex)
+            {
+              log.error("Executor thread interrupted");
+            }
+            procStatus = procImpl.getProcessStatus();
+            hasActiveTokens = tokenExecutor.hasActiveTokens();
           }
-          catch (InterruptedException ex)
-          {
-            log.error("Executor thread interrupted");
-          }
-          procStatus = procImpl.getProcessStatus();
-          hasActiveTokens = tokenExecutor.hasActiveTokens();
+          log.debug("End execution thread [status=" + procStatus + ",tokens=" + tokenExecutor.getActiveTokens() + "]");
         }
-        log.debug("End execution thread [status=" + procStatus + ",tokens=" + tokenExecutor.getActiveTokens() + "]");
       }
       finally
       {

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -29,15 +29,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.bpm.BPMException;
-import org.jboss.bpm.ProcessTimeoutException;
 import org.jboss.bpm.client.DialectHandler;
 import org.jboss.bpm.client.DialectRegistry;
 import org.jboss.bpm.client.ExecutionManager;
 import org.jboss.bpm.client.ProcessManager;
 import org.jboss.bpm.model.Process;
-import org.jboss.bpm.model.Process.ProcessStatus;
-import org.jboss.bpm.model.internal.ProcessImpl;
 import org.jboss.bpm.runtime.Attachments;
 
 /**
@@ -75,72 +71,15 @@
     exm.startProcess(proc, att);
   }
 
-  @Override
-  public ProcessStatus waitForEnd(ObjectName procID, long timeout)
-  {
-    Process proc = getProcessStrict(procID);
-    ProcessImpl procImpl = (ProcessImpl)proc;
-    ProcessStatus status = proc.getProcessStatus();
-
-    if (status == ProcessStatus.None || status == ProcessStatus.Ready)
-      throw new IllegalStateException("Cannot wait for process in state: " + status);
-
-    // Get the executor thread for interuption
-    Thread executorThread = procImpl.getExecutorThread();
-    
-    // Wait a little for the process to end
-    boolean forever = (timeout < 1);
-    long now = System.currentTimeMillis();
-    long until = now + timeout;
-    try
-    {
-      while (forever || now < until)
-      {
-        if (status == ProcessStatus.Cancelled || status == ProcessStatus.Completed || status == ProcessStatus.Aborted)
-        {
-          RuntimeException rte = procImpl.getRuntimeException();
-          if (rte != null)
-            throw new BPMException("Process aborted", rte);
-          
-          break;
-        }
-        
-        // Join the executor thread
-        executorThread.join(timeout);
-        now = System.currentTimeMillis();
-        status = proc.getProcessStatus();
-      }
-      
-      // Throw timeout exception if it took too long
-      if (status != ProcessStatus.Cancelled && status != ProcessStatus.Completed && status != ProcessStatus.Aborted)
-      {
-        ProcessTimeoutException rte = new ProcessTimeoutException("Process timeout after " + timeout + "ms for: " + procID);
-        log.error(rte);
-        
-        log.error("Interrupt executor thread");
-        executorThread.interrupt();
-        
-        throw rte;
-      }
-    }
-    catch (InterruptedException ex)
-    {
-      log.warn(ex);
-    }
-    finally
-    {
-      // Destroy the process
-      destroyProcess(procImpl);
-    }
-    
-    return status;
-  }
-
   private Process getProcessStrict(ObjectName procID)
   {
     Process proc = getProcessByID(procID);
     if (proc == null)
-      throw new IllegalStateException("Cannot obtain process: " + procID);
+    {
+      IllegalStateException rte = new IllegalStateException("Cannot obtain process: " + procID);
+      log.error(rte);
+      throw rte;
+    }
     return proc;
   }
 }

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -110,7 +110,6 @@
           {
             signalManager.throwSignal(procImpl.getName(), sigHandler.getExitSignal());
           }
-
         }
 
         tokStatus = token.getTokenStatus();
@@ -125,14 +124,19 @@
     catch (RuntimeException rte)
     {
       log.error("Process aborted: " + procImpl, rte);
-      token.setTokenStatus(TokenStatus.Destroyed);
       procImpl.setProcessStatus(ProcessStatus.Aborted);
       procImpl.setRuntimeException(rte);
     }
-//    finally
-//    {
-//      procImpl.notifyAll();
-//    }
+    finally
+    {
+      
+      token.setTokenStatus(TokenStatus.Destroyed);
+      // Notify the ExecutionManager when a token terminates
+      synchronized (procImpl)
+      {
+        procImpl.notifyAll();
+      }
+    }
   }
 
   private SignalHandler getSignalHandler(FlowObject target)

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -35,8 +35,10 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.bpm.BPMException;
 import org.jboss.bpm.InvalidProcessException;
 import org.jboss.bpm.NotImplementedException;
+import org.jboss.bpm.ProcessTimeoutException;
 import org.jboss.bpm.client.ProcessManager;
 import org.jboss.bpm.client.internal.ProcessManagerImpl;
 import org.jboss.bpm.model.EndEvent;
@@ -235,16 +237,69 @@
 
   public void waitForEnd()
   {
-    ProcessManager pm = ProcessManager.locateProcessManager();
-    pm.waitForEnd(getID(), 0);
+    waitForEndInternal(0);
   }
 
   public void waitForEnd(long timeout)
   {
-    ProcessManager pm = ProcessManager.locateProcessManager();
-    pm.waitForEnd(getID(), timeout);
+    waitForEndInternal(timeout);
   }
+  
+  /**
+   * Wait for the Process to end. All Tokens that are generated at the Start Event for that Process must eventually
+   * arrive at an End Event. The Process will be in a running state until all Tokens are consumed. If the process
+   * was aborted this method throws the causing RuntimeException if avaialable.
+   */
+  private void waitForEndInternal(long timeout)
+  {
+    if (status == ProcessStatus.None || status == ProcessStatus.Ready)
+      throw new IllegalStateException("Cannot wait for process in state: " + status);
 
+    // Wait a little for the process to end
+    boolean forever = (timeout < 1);
+    long now = System.currentTimeMillis();
+    long until = now + timeout;
+    try
+    {
+      while (forever || now < until)
+      {
+        if (status == ProcessStatus.Cancelled || status == ProcessStatus.Completed || status == ProcessStatus.Aborted)
+        {
+          if (runtimeException != null)
+            throw new BPMException("Process aborted", runtimeException);
+          
+          break;
+        }
+        
+        // Join the executor thread
+        executorThread.join(timeout);
+        now = System.currentTimeMillis();
+      }
+      
+      // Throw timeout exception if it took too long
+      if (status != ProcessStatus.Cancelled && status != ProcessStatus.Completed && status != ProcessStatus.Aborted)
+      {
+        ProcessTimeoutException rte = new ProcessTimeoutException("Process timeout after " + timeout + "ms for: " + getID());
+        log.error(rte);
+        
+        log.error("Interrupt executor thread");
+        executorThread.interrupt();
+        
+        throw rte;
+      }
+    }
+    catch (InterruptedException ex)
+    {
+      log.warn(ex);
+    }
+    finally
+    {
+      // Destroy the process
+      ProcessManager procManager = ProcessManager.locateProcessManager();
+      procManager.destroyProcess(this);
+    }
+  }
+
   public FlowObject getFlowObject(String name)
   {
     if (name == null)

Modified: jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java
===================================================================
--- jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -91,6 +91,7 @@
     URL expURL = getResourceURL("samples/airticket/airticket-api10.xml");
     ProcessManager pm = ProcessManager.locateProcessManager();
     Process proc = pm.createProcess(expURL);
+    pm.destroyProcess(proc);
     
     StringWriter strwr = new StringWriter();
     DialectHandler dh = pm.getDialectHandler("urn:bpm.jboss:pdl-0.1");

Modified: jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java
===================================================================
--- jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java	2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java	2008-08-14 14:39:59 UTC (rev 1890)
@@ -27,6 +27,8 @@
 
 import javax.management.ObjectName;
 
+import org.jboss.bpm.client.SignalListener;
+import org.jboss.bpm.client.SignalManager;
 import org.jboss.bpm.model.Process;
 import org.jboss.bpm.model.ProcessBuilder;
 import org.jboss.bpm.model.ProcessBuilderFactory;
@@ -42,6 +44,8 @@
  */
 public class ProcessStartTest extends DefaultEngineTestCase
 {
+  private RuntimeException startException;
+  
   public void testStartProcessReturn() throws Exception
   {
     Process proc = getProcess();
@@ -52,19 +56,34 @@
 
   public void testStartTwice() throws Exception
   {
-    Process proc = getProcess();
-    ObjectName procID = proc.getID();
-    assertEquals(procID, proc.startProcess());
-    try
+    startException = null;
+    
+    final Process proc = getProcess();
+    SignalListener signalListener = new SignalListener()
     {
-      proc.startProcess();
-      fail("Cannot start process twice");
-    }
-    catch (RuntimeException rte)
-    {
-      // expected;
-    }
+      public void catchSignal(Signal signal)
+      {
+        if (signal.getSignalType() == SignalType.SYSTEM_TASK_ENTER)
+        {
+          try
+          {
+            proc.startProcess();
+            fail("Cannot start process twice");
+          }
+          catch (RuntimeException rte)
+          {
+            startException = rte;
+          }
+        }
+      }
+    };
+    SignalManager sm = SignalManager.locateSignalManager();
+    sm.addSignalListener(getName(), signalListener);
+    
+    proc.startProcess();
     proc.waitForEnd();
+    
+    assertNotNull("Start exception expected", startException);
   }
 
   public void testProcessRestart() throws Exception




More information about the jbpm-commits mailing list