[jbpm-commits] JBoss JBPM SVN: r1945 - in jbossbpm/spec/trunk/modules: api/src/main/java/org/jboss/bpm/model and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 20 15:39:22 EDT 2008


Author: thomas.diesler at jboss.com
Date: 2008-08-20 15:39:22 -0400 (Wed, 20 Aug 2008)
New Revision: 1945

Added:
   jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/
   jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java
   jbossbpm/spec/trunk/modules/ri/src/test/resources/
   jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml
   jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml
Modified:
   jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java
   jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java
   jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java
Log:
WIP

Modified: jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java
===================================================================
--- jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java	2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -28,6 +28,7 @@
 
 import org.jboss.bpm.model.Process;
 import org.jboss.bpm.model.StartEvent;
+import org.jboss.bpm.model.Process.ProcessStatus;
 import org.jboss.bpm.runtime.Attachments;
 
 /**
@@ -38,8 +39,7 @@
  */
 public abstract class ExecutionManager
 {
-  private int maxProcessThreads = 10;
-  private ExecutorService procExecutor = Executors.newFixedThreadPool(maxProcessThreads);
+  private ExecutorService procExecutor = Executors.newCachedThreadPool();
   
   // Hide public constructor
   protected ExecutionManager()
@@ -78,4 +78,20 @@
    * @param att The Attachments in the ExecutionContext
    */
   public abstract void startProcess(StartEvent start, Attachments att);
+  
+  /**
+   * All Tokens that are generated at the Start Event for that Process must eventually arrive at an End Event. 
+   * The Process will be in a running state until all Tokens are consumed.
+   * <p/>
+   * This method until the process ends without timeout.
+   */
+  public abstract ProcessStatus waitForEnd(Process proc);
+  
+  /**
+   * All Tokens that are generated at the Start Event for that Process must eventually arrive at an End Event. 
+   * The Process will be in a running state until all Tokens are consumed.
+   * <p/>
+   * This method until the process ends with a given timeout.
+   */
+  public abstract ProcessStatus waitForEnd(Process proc, long timeout);
 }

Modified: jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java
===================================================================
--- jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java	2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -61,12 +61,12 @@
   /**
    * Get the process state
    */
-  Process.ProcessStatus getProcessStatus();
+  ProcessStatus getProcessStatus();
   
   /**
    * Get the process type
    */
-  Process.ProcessType getProcessType();
+  ProcessType getProcessType();
 
   /**
    * Get the list of flow objects
@@ -86,7 +86,7 @@
   
   /**
    * One or more Performers MAY be entered. The Performers attribute defines the resource that will be responsible for
-   * the Process. The Performers entry could be in the form of a specific individual, a group, an organization role or
+   * the  The Performers entry could be in the form of a specific individual, a group, an organization role or
    * position, or an organization.
    */
   List<String> getPerformers();
@@ -98,17 +98,17 @@
   List<Assignment> getAssignments();
 
   /**
-   * The InputSets attribute defines the data requirements for input to the Process. Zero or more InputSets MAY be
+   * The InputSets attribute defines the data requirements for input to the  Zero or more InputSets MAY be
    * defined. Each Input set is sufficient to allow the Process to be performed (if it has first been instantiated by
    * the appropriate signal arriving from an incoming Sequence Flow)
    */
   List<InputSet> getInputSets();
 
   /**
-   * The OutputSets attribute defines the data requirements for output from the Process. Zero or more OutputSets MAY be
+   * The OutputSets attribute defines the data requirements for output from the  Zero or more OutputSets MAY be
    * defined. At the completion of the Process, only one of the OutputSets may be produced--It is up to the
    * implementation of the Process to determine which set will be produced. However, the IORules attribute MAY indicate
-   * a relationship between an OutputSet and an InputSet that started the Process.
+   * a relationship between an OutputSet and an InputSet that started the 
    */
   List<OutputSet> getOutputSets();
 

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java	2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -25,13 +25,14 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 import javax.management.ObjectName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.bpm.BPMException;
 import org.jboss.bpm.InvalidProcessException;
+import org.jboss.bpm.ProcessTimeoutException;
 import org.jboss.bpm.client.ExecutionManager;
 import org.jboss.bpm.client.ProcessManager;
 import org.jboss.bpm.client.SignalManager;
@@ -48,7 +49,6 @@
 import org.jboss.bpm.ri.model.impl.RuntimeProcessImpl;
 import org.jboss.bpm.ri.model.impl.SequenceFlowImpl;
 import org.jboss.bpm.ri.model.impl.TokenExecutorImpl;
-import org.jboss.bpm.ri.model.impl.TokenExecutorImpl.StartCallback;
 import org.jboss.bpm.ri.runtime.DelegatingToken;
 import org.jboss.bpm.ri.runtime.MutableToken;
 import org.jboss.bpm.ri.runtime.RuntimeProcess;
@@ -73,69 +73,163 @@
   private Map<ObjectName, RuntimeProcess> runtimeProcesses = new HashMap<ObjectName, RuntimeProcess>();
 
   @Override
-  public void startProcess(final Process proc, final Attachments att)
+  public void startProcess(Process proc, Attachments att)
   {
-    // Setup the StartCallback that will be called from the Token Executor
-    final RuntimeProcessImpl rtProc = new RuntimeProcessImpl(proc);
-    final RunnableProcess runnableProcess = new RunnableProcess(rtProc);
-    runtimeProcesses.put(proc.getID(), rtProc);
+    // Get the None Start Event if there is one and start the initial flow
+    StartEvent start = getNoneStartEvent(proc);
+    if (start != null)
+      startProcessInternal(start, att);
+  }
+
+  @Override
+  public void startProcess(StartEvent start, Attachments att)
+  {
+    startProcessInternal(start, att);
+  }
+
+  private synchronized void startProcessInternal(StartEvent start, Attachments att)
+  {
+    @SuppressWarnings("serial")
+    class InitialFlow extends SequenceFlowImpl
+    {
+      InitialFlow(StartEvent start)
+      {
+        super(start.getName());
+        setTargetRef(start);
+      }
+    }
     
-    StartCallback startCallback = new StartCallback()
+    Process proc = start.getProcess();
+    ProcessImpl procImpl = (ProcessImpl)proc;
+    RuntimeProcess rtProc = getRuntimeProcess(proc, false);
+    boolean startProcessThread = (rtProc == null); 
+    
+    // Reset the process if already terminated
+    if (isProcessTerminated(proc))
+      procImpl.resetProcess();
+    
+    ProcessStatus procStatus = proc.getProcessStatus();
+    if (procStatus != ProcessStatus.Ready)
+      throw new IllegalStateException("Cannot start process in state: " + procStatus);
+
+    // Register the process if needed
+    ProcessManager pm = ProcessManager.locateProcessManager();
+    if (pm.getProcessByID(proc.getID()) == null)
+      pm.registerProcess(proc);
+    
+    rtProc = getRuntimeProcess(proc, true);
+    TokenExecutor tokenExecutor = rtProc.getTokenExecutor();
+    TokenImpl initialToken = new TokenImpl(att);
+    tokenExecutor.create(initialToken, new InitialFlow(start));
+    
+    // Start a new process thread
+    if (startProcessThread)
     {
-      public void start(Token token)
+      RunnableProcess runnable = new RunnableProcess(rtProc);
+      getProcessExecutor().execute(runnable);
+      synchronized (proc)
       {
-        // Process start time assignments
-        startTimeAssignments(proc, token);
+        while (proc.getProcessStatus() != ProcessStatus.Active)
+        {
+          try
+          {
+            log.debug("Wait to become Active " + proc);
+            proc.wait();
+            log.debug("Notified " + proc);
+          }
+          catch (InterruptedException ex)
+          {
+            log.error(ex);
+          }
+        }
+      }
+    }
+    
+    // Do the start time assignments
+    startTimeAssignments(proc, initialToken);
+    
+    // Start the initial token
+    tokenExecutor.start(initialToken);
+  }
 
-        // Execute the Process through the ExecutorService
-        ExecutorService processExecutor = getProcessExecutor();
-        log.debug("Execute runnableProcess");
-        processExecutor.execute(runnableProcess);
-        //new Thread(runnableProcess).start();
+  public ProcessStatus waitForEnd(Process proc)
+  {
+    return waitForEndInternal(proc, 0);
+  }
 
+  public ProcessStatus waitForEnd(Process proc, long timeout)
+  {
+    return waitForEndInternal(proc, timeout);
+  }
+
+  /**
+   * Wait for the Process to end. All Tokens that are generated at the Start Event for that Process must eventually
+   * arrive at an End Event. The Process will be in a running state until all Tokens are consumed. If the process was
+   * aborted this method throws the causing RuntimeException if avaialable.
+   */
+  private ProcessStatus waitForEndInternal(Process proc, long timeout)
+  {
+    ProcessImpl procImpl = (ProcessImpl)proc;
+    
+    ProcessStatus status = proc.getProcessStatus();
+    if (status == ProcessStatus.None)
+      throw new IllegalStateException("Cannot wait for process in state: " + status);
+
+    // Wait a little for the process to end
+    boolean forever = (timeout < 1);
+    long now = System.currentTimeMillis();
+    long until = now + timeout;
+    try
+    {
+      while (forever || now < until)
+      {
         synchronized (proc)
         {
-          // Wait for the process to become active
-          while (proc.getProcessStatus() == ProcessStatus.Ready)
+          if (isProcessTerminated(proc))
           {
-            try
+            if (procImpl.getRuntimeException() != null)
             {
-              log.debug("Wait for process to become active");
-              proc.wait();
-              log.debug("Notified: " + proc);
+              throw new BPMException("Process aborted", procImpl.getRuntimeException());
             }
-            catch (InterruptedException ex)
+            else
             {
-              log.error(ex);
+              break;
             }
           }
+          
+          // Start waiting to get notified
+          long waitTimeout = forever ? 0 : until - now;
+          log.debug("Wait for " + waitTimeout + "ms on " + proc);
+          proc.wait(waitTimeout);
+          log.debug("Notified " + proc);
         }
+        now = System.currentTimeMillis();
       }
-    };
-    rtProc.setTokenExecutor(new TokenExecutorImpl(rtProc, startCallback));
-
-    // Get the None Start Event if there is one and start the initial flow
-    StartEvent start = getNoneStartEvent(proc);
-    if (start != null)
-      startProcessInternal(start, att);
+      
+      // Throw timeout exception if it took too long
+      if (isProcessTerminated(proc) == false)
+      {
+        RuntimeException rte = new ProcessTimeoutException("Process timeout after " + timeout + "ms for: " + proc.getID());
+        procImpl.setRuntimeException(rte);
+        log.error(rte);
+        throw rte;
+      }
+    }
+    catch (InterruptedException ex)
+    {
+      log.warn(ex);
+    }
+    
+    status = proc.getProcessStatus();
+    return status;
   }
-
-  @Override
-  public void startProcess(StartEvent start, Attachments att)
+  
+  private boolean isProcessTerminated(Process proc)
   {
-    startProcessInternal(start, att);
+    ProcessStatus status = proc.getProcessStatus();
+    return status == ProcessStatus.Cancelled || status == ProcessStatus.Completed || status == ProcessStatus.Aborted;
   }
-
-  private synchronized void startProcessInternal(StartEvent start, Attachments att)
-  {
-    ObjectName procID = start.getProcess().getID();
-    RuntimeProcess rtProc = runtimeProcesses.get(procID);
-    TokenExecutor tokenExecutor = rtProc.getTokenExecutor();
-    TokenImpl token = new TokenImpl(att);
-    tokenExecutor.create(token, new InitialFlow(start));
-    tokenExecutor.start(token);
-  }
-
+  
   private StartEvent getNoneStartEvent(Process proc)
   {
     StartEvent start = null;
@@ -151,6 +245,22 @@
     return start;
   }
 
+  private RuntimeProcess getRuntimeProcess(Process proc, boolean createNew)
+  {
+    RuntimeProcess rtProcess;
+    synchronized (runtimeProcesses)
+    {
+      rtProcess = runtimeProcesses.get(proc.getID());
+      if (rtProcess == null && createNew)
+      {
+        TokenExecutorImpl tokenExecutor = new TokenExecutorImpl();
+        rtProcess = new RuntimeProcessImpl(proc, tokenExecutor);
+        runtimeProcesses.put(proc.getID(), rtProcess);
+      }
+    }
+    return rtProcess;
+  }
+
   // Evaluate the Start time assignments
   private void startTimeAssignments(Process proc, Token token)
   {
@@ -169,20 +279,7 @@
     }
   }
 
-  /**
-   * The initial flow to the StartEvent
-   */
-  @SuppressWarnings("serial")
-  class InitialFlow extends SequenceFlowImpl
-  {
-    InitialFlow(StartEvent start)
-    {
-      super(start.getName());
-      setTargetRef(start);
-    }
-  }
-
-  /**
+  /***************************************************************
    * The runnable Process
    */
   class RunnableProcess implements Runnable
@@ -213,24 +310,30 @@
           procImpl.setProcessStatus(ProcessStatus.Active);
           signalManager.throwSignal(proc.getName(), new Signal(proc.getName(), SignalType.SYSTEM_PROCESS_ENTER));
 
-          // Notify that the Process is Active
+          // Notify that the process is now Active
+          log.debug("Notify: " + proc);
           proc.notifyAll();
-
+        }
+        
+        synchronized (rtProc)
+        {
           // Wait until there are no more runnable tokens
           while (tokenExecutor.hasRunnableTokens())
           {
             try
             {
-              log.debug("Start Waiting");
-              proc.wait();
-              log.debug("Stop Waiting");
+              log.debug("Wait: " + rtProc);
+              rtProc.wait();
+              log.debug("Notified: " + rtProc);
             }
             catch (InterruptedException ex)
             {
-              log.error("Executor thread interrupted");
+              log.error(ex);
             }
           }
+          
           log.debug("End execution thread [proc=" + procName + ",status=" + proc.getProcessStatus() + "]");
+          procImpl.setProcessStatus(ProcessStatus.Completed);
         }
       }
       finally
@@ -239,16 +342,13 @@
 
         synchronized (proc)
         {
-          if (proc.getProcessStatus() == ProcessStatus.Active)
-            procImpl.setProcessStatus(ProcessStatus.Completed);
-
+          // Notify that the process has now ended
+          log.debug("Notify: " + proc);
+          proc.notifyAll();
+          
           ProcessManager procManager = ProcessManager.locateProcessManager();
           procManager.unregisterProcess(proc);
           runtimeProcesses.remove(procID);
-
-          // Notify the Process
-          log.debug("Notify Process");
-          proc.notifyAll();
         }
       }
     }

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java	2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -223,7 +223,7 @@
     return getID();
   }
 
-  private void resetProcess()
+  public void resetProcess()
   {
     log.debug("Reset process: " + this);
     for (FlowObject fo : flowObjects)

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java	2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -38,9 +38,10 @@
   private Process process;
   private TokenExecutor tokenExecutor;
 
-  public RuntimeProcessImpl(Process process)
+  public RuntimeProcessImpl(Process process, TokenExecutor tokenExecutor)
   {
     this.process = process;
+    this.tokenExecutor = tokenExecutor;
   }
 
   public Process getProcess()
@@ -52,9 +53,4 @@
   {
     return tokenExecutor;
   }
-
-  public void setTokenExecutor(TokenExecutor tokenExecutor)
-  {
-    this.tokenExecutor = tokenExecutor;
-  }
 }
\ No newline at end of file

Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java	2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -34,13 +34,16 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.bpm.model.ConnectingObject;
+import org.jboss.bpm.model.FlowObject;
 import org.jboss.bpm.model.Process;
 import org.jboss.bpm.model.SequenceFlow;
 import org.jboss.bpm.model.Process.ProcessStatus;
-import org.jboss.bpm.ri.client.RunnableToken;
+import org.jboss.bpm.ri.runtime.DelegatingToken;
 import org.jboss.bpm.ri.runtime.MutableToken;
 import org.jboss.bpm.ri.runtime.RuntimeProcess;
 import org.jboss.bpm.runtime.FlowHandler;
+import org.jboss.bpm.runtime.HandlerSupport;
+import org.jboss.bpm.runtime.SignalHandler;
 import org.jboss.bpm.runtime.Token;
 import org.jboss.bpm.runtime.TokenExecutor;
 import org.jboss.bpm.runtime.Token.TokenStatus;
@@ -61,109 +64,226 @@
   {
     void start(Token token);
   }
-
-  private RuntimeProcess rtProc;
+  
   private ExecutorService executor = Executors.newCachedThreadPool();
   private Map<String, RunnableToken> runnableTokens = new HashMap<String, RunnableToken>();
-  private StartCallback startCallback;
 
-  public TokenExecutorImpl(RuntimeProcess rtProc, StartCallback startCallback)
-  {
-    this.rtProc = rtProc;
-    this.startCallback = startCallback;
-  }
-
   public Set<Token> getRunnableTokens()
   {
-    Set<Token> tokenSet = new HashSet<Token>();
-    for (RunnableToken rt : runnableTokens.values())
-      tokenSet.add(rt.getToken());
+    synchronized (runnableTokens)
+    {
+      Set<Token> tokenSet = new HashSet<Token>();
+      for (RunnableToken rt : runnableTokens.values())
+        tokenSet.add(rt.getToken());
 
-    return Collections.unmodifiableSet(tokenSet);
+      return Collections.unmodifiableSet(tokenSet);
+    }
   }
 
   public boolean hasRunnableTokens()
   {
-    return runnableTokens.size() > 0;
+    synchronized (runnableTokens)
+    {
+      return runnableTokens.size() > 0;
+    }
   }
 
   public void create(Token token, SequenceFlow initialFlow)
   {
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setFlow(initialFlow);
-    RunnableToken rtToken = new RunnableToken(rtProc, mutableToken);
-    runnableTokens.put(token.getTokenID(), rtToken);
+    synchronized (runnableTokens)
+    {
+      MutableToken mutableToken = (MutableToken)token;
+      mutableToken.setFlow(initialFlow);
+      Process proc = initialFlow.getTargetRef().getProcess();
+      RuntimeProcess rtProc = new RuntimeProcessImpl(proc, this);
+      RunnableToken rtToken = new RunnableToken(rtProc, mutableToken);
+      runnableTokens.put(token.getTokenID(), rtToken);
+    }
   }
 
   public void start(Token token)
   {
-    Process proc = rtProc.getProcess();
-    ProcessStatus procStatus = proc.getProcessStatus();
-    if (procStatus != ProcessStatus.Ready && procStatus != ProcessStatus.Active)
-      throw new IllegalStateException("Cannot start token to process in state: " + procStatus);
+    synchronized (runnableTokens)
+    {
+      ProcessStatus procStatus = getProcess(token).getProcessStatus();
+      if (procStatus != ProcessStatus.Ready && procStatus != ProcessStatus.Active)
+        throw new IllegalStateException("Cannot start token to process in state: " + procStatus);
 
-    log.debug("Start Token: " + token);
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setTokenStatus(TokenStatus.Started);
-
-    if (startCallback != null)
-    {
-      log.debug("Call start callback: " + token);
-      startCallback.start(token);
-      startCallback = null;
+      RunnableToken rtToken = runnableTokens.get(token.getTokenID());
+      executor.submit(rtToken);
     }
-
-    RunnableToken rtToken = runnableTokens.get(token.getTokenID());
-    executor.submit(rtToken);
   }
 
   public void move(Token token, SequenceFlow flow)
   {
-    if (flow == null)
-      throw new IllegalArgumentException("Flow cannot be null");
+    synchronized (runnableTokens)
+    {
+      if (flow == null)
+        throw new IllegalArgumentException("Flow cannot be null");
 
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setFlow(flow);
+      MutableToken mutableToken = (MutableToken)token;
+      mutableToken.setFlow(flow);
+    }
   }
 
   public void stop(Token token)
   {
-    log.debug("Stop Token: " + token);
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setTokenStatus(TokenStatus.Stoped);
+    synchronized (runnableTokens)
+    {
+      log.debug("Stop Token: " + token);
+      MutableToken mutableToken = (MutableToken)token;
+      mutableToken.setTokenStatus(TokenStatus.Stoped);
+    }
   }
 
   public void destroy(Token token)
   {
-    log.debug("Destroy Token: " + token);
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setTokenStatus(TokenStatus.Destroyed);
-    runnableTokens.remove(token.getTokenID());
+    synchronized (runnableTokens)
+    {
+      log.debug("Destroy Token: " + token);
+      MutableToken mutableToken = (MutableToken)token;
+      mutableToken.setTokenStatus(TokenStatus.Destroyed);
+      runnableTokens.remove(token.getTokenID());
+    }
   }
 
   public String suspend(Token token)
   {
-    log.debug("Suspend Token: " + token);
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setTokenStatus(TokenStatus.Suspended);
-    return token.getTokenID();
+    synchronized (runnableTokens)
+    {
+      log.debug("Suspend Token: " + token);
+      MutableToken mutableToken = (MutableToken)token;
+      mutableToken.setTokenStatus(TokenStatus.Suspended);
+      return token.getTokenID();
+    }
   }
 
   public Token activate(String tokenID)
   {
-    RunnableToken rtToken = runnableTokens.get(tokenID);
-    if (rtToken == null)
-      throw new IllegalStateException("Not a runnable token: " + tokenID);
+    synchronized (runnableTokens)
+    {
+      RunnableToken rtToken = runnableTokens.get(tokenID);
+      if (rtToken == null)
+        throw new IllegalStateException("Not a runnable token: " + tokenID);
 
-    Token token = rtToken.getToken();
-    if (token.getTokenStatus() != TokenStatus.Suspended)
-      throw new IllegalStateException("Activate token in state: " + token.getTokenStatus());
+      Token token = rtToken.getToken();
+      if (token.getTokenStatus() != TokenStatus.Suspended)
+        throw new IllegalStateException("Activate token in state: " + token.getTokenStatus());
 
-    log.debug("Activate Token: " + token);
-    MutableToken mutableToken = (MutableToken)token;
-    mutableToken.setTokenStatus(TokenStatus.Started);
+      log.debug("Activate Token: " + token);
+      MutableToken mutableToken = (MutableToken)token;
+      mutableToken.setTokenStatus(TokenStatus.Started);
 
-    executor.submit(rtToken);
-    return token;
+      executor.submit(rtToken);
+      return token;
+    }
   }
+
+  private Process getProcess(Token token)
+  {
+    return token.getFlow().getTargetRef().getProcess();
+  }
+
+  /****************************************************
+   * The runnable Token
+   */
+  class RunnableToken implements Runnable
+  {
+    private RuntimeProcess rtProc;
+    private MutableToken token;
+
+    public RunnableToken(RuntimeProcess rtProc, MutableToken token)
+    {
+      this.rtProc = rtProc;
+      this.token = token;
+    }
+
+    public Token getToken()
+    {
+      return token;
+    }
+
+    public void run()
+    {
+      Process proc = rtProc.getProcess();
+      TokenExecutor tokenExecutor = rtProc.getTokenExecutor();
+      try
+      {
+        ConnectingObject flow = token.getFlow();
+        if (flow == null)
+          throw new IllegalStateException("Cannot obtain initial flow");
+
+        TokenStatus tokStatus = token.getTokenStatus();
+        ProcessStatus procStatus = proc.getProcessStatus();
+        while (procStatus == ProcessStatus.Active && tokStatus == TokenStatus.Started)
+        {
+          // Get the target and its handlers
+          FlowObject flowObject = token.getFlow().getTargetRef();
+          FlowObjectImpl flowObjectImpl = (FlowObjectImpl)flowObject;
+          SignalHandler sigHandler = getSignalHandler(flowObject);
+
+          // Synchronize execution on the target FlowObject
+          synchronized (flowObject)
+          {
+            // Throw the Enter Signal
+            sigHandler.throwEnterSignal();
+
+            // Create a Token that includes properties from the current Activity
+            DelegatingToken delegatingToken = new DelegatingToken(token);
+
+            // Execute the target FlowObject
+            flowObjectImpl.execute(delegatingToken);
+
+            // Transfer the token to the FlowHandler
+            flowObjectImpl.executeFlowHandler(tokenExecutor, delegatingToken);
+
+            // Throw the Exit Signal
+            sigHandler.throwExitSignal();
+
+            tokStatus = token.getTokenStatus();
+            procStatus = proc.getProcessStatus();
+          }
+        }
+        
+        // Notify Process on token termination
+        terminateToken(proc, tokenExecutor);
+      }
+      catch (RuntimeException rte)
+      {
+        log.error("Process aborted: " + proc, rte);
+        ((ProcessImpl)proc).setRuntimeException(rte);
+        
+        // Notify Process on token termination
+        terminateToken(proc, tokenExecutor);
+      }
+    }
+
+    // Remove the token from the list of runnable tokens then notify the process
+    private void terminateToken(Process proc, TokenExecutor tokenExecutor)
+    {
+      tokenExecutor.destroy(token);
+      synchronized (rtProc)
+      {
+        log.debug("Notify: " + rtProc);
+        rtProc.notifyAll();
+      }
+    }
+
+    private SignalHandler getSignalHandler(FlowObject target)
+    {
+      HandlerSupport handlerSupport = getHandlerSupport(target);
+      SignalHandler handler = handlerSupport.getSignalHandler();
+      if (handler == null)
+        throw new IllegalStateException("Cannot obtain signal handler from: " + target);
+
+      return handler;
+    }
+
+    private HandlerSupport getHandlerSupport(FlowObject fo)
+    {
+      if (fo instanceof HandlerSupport == false)
+        throw new IllegalStateException("Flow object does not implement handler support: " + fo);
+      return (HandlerSupport)fo;
+    }
+  }
 }
\ No newline at end of file

Added: jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java	                        (rev 0)
+++ jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java	2008-08-20 19:39:22 UTC (rev 1945)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.bpm.runtime;
+
+// $Id$
+
+import org.jboss.bpm.client.ExecutionManager;
+import org.jboss.bpm.model.Process;
+import org.jboss.bpm.model.ProcessBuilder;
+import org.jboss.bpm.model.ProcessBuilderFactory;
+import org.jboss.bpm.model.Process.ProcessStatus;
+import org.jboss.bpm.test.DefaultEngineTestCase;
+
+/**
+ * Test the ExecutionManager
+ * 
+ * @author thomas.diesler at jboss.com
+ * @since 08-Jul-2008
+ */
+public class ExecutionManagerTest extends DefaultEngineTestCase
+{
+  public void testSequenceFlow() throws Exception
+  {
+    ProcessBuilder procBuilder = ProcessBuilderFactory.newInstance().newProcessBuilder();
+    procBuilder.addProcess(getName()).addStartEvent("Start").addSequenceFlow("Task");
+    procBuilder.addTask("Task").addSequenceFlow("End").addEndEvent("End");
+    Process proc = procBuilder.getProcess();
+
+    ExecutionManager em = ExecutionManager.locateExecutionManager();
+    em.startProcess(proc, null);
+    
+    ProcessStatus status = em.waitForEnd(proc);
+    assertEquals(ProcessStatus.Completed, status);
+  }
+  
+  public void testRestartSequenceFlow() throws Exception
+  {
+    ProcessBuilder procBuilder = ProcessBuilderFactory.newInstance().newProcessBuilder();
+    procBuilder.addProcess(getName()).addStartEvent("Start").addSequenceFlow("Task");
+    procBuilder.addTask("Task").addSequenceFlow("End").addEndEvent("End");
+    Process proc = procBuilder.getProcess();
+
+    ExecutionManager em = ExecutionManager.locateExecutionManager();
+    em.startProcess(proc, null);
+    
+    ProcessStatus status = em.waitForEnd(proc);
+    assertEquals(ProcessStatus.Completed, status);
+    
+    em.startProcess(proc, null);
+    
+    status = em.waitForEnd(proc);
+    assertEquals(ProcessStatus.Completed, status);
+  }
+}


Property changes on: jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml	                        (rev 0)
+++ jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml	2008-08-20 19:39:22 UTC (rev 1945)
@@ -0,0 +1,45 @@
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+ 
+  <!-- The KernelLocator -->
+  <bean name="KernelLocator" class="org.jboss.kernel.plugins.util.KernelLocator"/>
+  
+  <!-- The Builder Factories -->
+  <bean name="jBPMProcessBuilderFactory" class="org.jboss.bpm.ri.model.impl.ProcessBuilderFactoryImpl"/>
+  <bean name="jBPMMessageBuilderFactory" class="org.jboss.bpm.ri.model.impl.MessageBuilderFactoryImpl"/>
+  <bean name="jBPMPropertyBuilderFactory" class="org.jboss.bpm.ri.model.impl.PropertyBuilderFactoryImpl"/>
+  
+  <!-- The ProcessEngine -->
+  <bean name="jBPMProcessEngine" class="org.jboss.bpm.ri.client.ProcessEngineImpl">
+    <property name="processManager"><inject bean="jBPMProcessManager"/></property>
+    <property name="executionManager"><inject bean="jBPMExecutionManager"/></property>
+    <property name="signalManager"><inject bean="jBPMSignalManager"/></property>
+    <property name="messageManager"><inject bean="jBPMMessageManager"/></property>
+  </bean>
+
+  <!-- The Managers -->
+  <bean name="jBPMExecutionManager" class="org.jboss.bpm.ri.client.ExecutionManagerImpl"/>
+  <bean name="jBPMSignalManager" class="org.jboss.bpm.ri.client.SignalManagerImpl"/>
+  <bean name="jBPMMessageManager" class="org.jboss.bpm.ri.client.MessageManagerImpl"/>
+  
+  <!-- The ProcessManager -->
+  <bean name="jBPMProcessManager" class="org.jboss.bpm.ri.client.ProcessManagerImpl">
+    <property name="dialectRegistry"><inject bean="jBPMDialectRegistry"/></property>
+    <property name="dialectHandlers">
+      <map keyClass="java.lang.String" valueClass="org.jboss.bpm.client.DialectHandler">
+      </map>
+    </property>
+  </bean>
+
+  <!-- The DialectRegistry -->
+  <bean name="jBPMDialectRegistry" class="org.jboss.bpm.client.DialectRegistry">
+    <property name="registry">
+      <map keyClass="java.lang.String" valueClass="java.lang.String">
+        <entry><key>urn:bpm.jboss:pdl-0.1</key><value>api10</value></entry>
+        <entry><key>urn:jbpm.org:jpdl-3.2</key><value>jpdl32</value></entry>
+        <entry><key>http://stp.eclipse.org/bpmn</key><value>stp</value></entry>
+        <entry><key>http://www.wfmc.org/2008/XPDL2.1</key><value>xpdl21</value></entry>
+      </map>
+    </property>
+  </bean>
+  
+</deployment>
\ No newline at end of file


Property changes on: jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml	                        (rev 0)
+++ jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml	2008-08-20 19:39:22 UTC (rev 1945)
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+  <!-- ================================= -->
+  <!-- Preserve messages in a local file -->
+  <!-- ================================= -->
+
+  <appender name="FILE" class="org.apache.log4j.FileAppender">
+    <param name="File" value="${log4j.output.dir}/test.log"/>
+    <param name="Append" value="false"/>
+    <layout class="org.apache.log4j.PatternLayout">
+      <!-- The default pattern: Date Priority [Category] Message\n -->
+      <param name="ConversionPattern" value="%d %-5p [%c:%L] %m%n"/>
+    </layout>
+  </appender>
+  
+  <!-- ============================== -->
+  <!-- Append messages to the console -->
+  <!-- ============================== -->
+
+  <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out" />
+    <param name="Threshold" value="INFO" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{HH:mm:ss,SSS} [%t] %-5p %C{1} : %m%n" />
+    </layout>
+  </appender>
+
+  <!-- ================ -->
+  <!-- Limit categories -->
+  <!-- ================ -->
+
+  <category name="org.jbpm">
+    <priority value="DEBUG" />
+  </category>
+
+  <category name="org.hibernate">
+    <priority value="INFO" />
+  </category>
+
+  <!-- hide optimistic locking failures -->
+  <category name="org.hibernate.event.def.AbstractFlushingEventListener">
+    <priority value="FATAL" />
+  </category>
+
+  <!-- ======================= -->
+  <!-- Setup the Root category -->
+  <!-- ======================= -->
+
+  <root>
+    <!--appender-ref ref="CONSOLE"/-->
+    <appender-ref ref="FILE"/>
+  </root>
+
+</log4j:configuration>


Property changes on: jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF




More information about the jbpm-commits mailing list