[jbpm-commits] JBoss JBPM SVN: r1945 - in jbossbpm/spec/trunk/modules: api/src/main/java/org/jboss/bpm/model and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Aug 20 15:39:22 EDT 2008
Author: thomas.diesler at jboss.com
Date: 2008-08-20 15:39:22 -0400 (Wed, 20 Aug 2008)
New Revision: 1945
Added:
jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/
jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java
jbossbpm/spec/trunk/modules/ri/src/test/resources/
jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml
jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml
Modified:
jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java
jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java
Log:
WIP
Modified: jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java
===================================================================
--- jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java 2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ExecutionManager.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -28,6 +28,7 @@
import org.jboss.bpm.model.Process;
import org.jboss.bpm.model.StartEvent;
+import org.jboss.bpm.model.Process.ProcessStatus;
import org.jboss.bpm.runtime.Attachments;
/**
@@ -38,8 +39,7 @@
*/
public abstract class ExecutionManager
{
- private int maxProcessThreads = 10;
- private ExecutorService procExecutor = Executors.newFixedThreadPool(maxProcessThreads);
+ private ExecutorService procExecutor = Executors.newCachedThreadPool();
// Hide public constructor
protected ExecutionManager()
@@ -78,4 +78,20 @@
* @param att The Attachments in the ExecutionContext
*/
public abstract void startProcess(StartEvent start, Attachments att);
+
+ /**
+ * All Tokens that are generated at the Start Event for that Process must eventually arrive at an End Event.
+ * The Process will be in a running state until all Tokens are consumed.
+ * <p/>
+ * This method until the process ends without timeout.
+ */
+ public abstract ProcessStatus waitForEnd(Process proc);
+
+ /**
+ * All Tokens that are generated at the Start Event for that Process must eventually arrive at an End Event.
+ * The Process will be in a running state until all Tokens are consumed.
+ * <p/>
+ * This method until the process ends with a given timeout.
+ */
+ public abstract ProcessStatus waitForEnd(Process proc, long timeout);
}
Modified: jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java
===================================================================
--- jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java 2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/model/Process.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -61,12 +61,12 @@
/**
* Get the process state
*/
- Process.ProcessStatus getProcessStatus();
+ ProcessStatus getProcessStatus();
/**
* Get the process type
*/
- Process.ProcessType getProcessType();
+ ProcessType getProcessType();
/**
* Get the list of flow objects
@@ -86,7 +86,7 @@
/**
* One or more Performers MAY be entered. The Performers attribute defines the resource that will be responsible for
- * the Process. The Performers entry could be in the form of a specific individual, a group, an organization role or
+ * the The Performers entry could be in the form of a specific individual, a group, an organization role or
* position, or an organization.
*/
List<String> getPerformers();
@@ -98,17 +98,17 @@
List<Assignment> getAssignments();
/**
- * The InputSets attribute defines the data requirements for input to the Process. Zero or more InputSets MAY be
+ * The InputSets attribute defines the data requirements for input to the Zero or more InputSets MAY be
* defined. Each Input set is sufficient to allow the Process to be performed (if it has first been instantiated by
* the appropriate signal arriving from an incoming Sequence Flow)
*/
List<InputSet> getInputSets();
/**
- * The OutputSets attribute defines the data requirements for output from the Process. Zero or more OutputSets MAY be
+ * The OutputSets attribute defines the data requirements for output from the Zero or more OutputSets MAY be
* defined. At the completion of the Process, only one of the OutputSets may be produced--It is up to the
* implementation of the Process to determine which set will be produced. However, the IORules attribute MAY indicate
- * a relationship between an OutputSet and an InputSet that started the Process.
+ * a relationship between an OutputSet and an InputSet that started the
*/
List<OutputSet> getOutputSets();
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java 2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/client/ExecutionManagerImpl.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -25,13 +25,14 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.bpm.BPMException;
import org.jboss.bpm.InvalidProcessException;
+import org.jboss.bpm.ProcessTimeoutException;
import org.jboss.bpm.client.ExecutionManager;
import org.jboss.bpm.client.ProcessManager;
import org.jboss.bpm.client.SignalManager;
@@ -48,7 +49,6 @@
import org.jboss.bpm.ri.model.impl.RuntimeProcessImpl;
import org.jboss.bpm.ri.model.impl.SequenceFlowImpl;
import org.jboss.bpm.ri.model.impl.TokenExecutorImpl;
-import org.jboss.bpm.ri.model.impl.TokenExecutorImpl.StartCallback;
import org.jboss.bpm.ri.runtime.DelegatingToken;
import org.jboss.bpm.ri.runtime.MutableToken;
import org.jboss.bpm.ri.runtime.RuntimeProcess;
@@ -73,69 +73,163 @@
private Map<ObjectName, RuntimeProcess> runtimeProcesses = new HashMap<ObjectName, RuntimeProcess>();
@Override
- public void startProcess(final Process proc, final Attachments att)
+ public void startProcess(Process proc, Attachments att)
{
- // Setup the StartCallback that will be called from the Token Executor
- final RuntimeProcessImpl rtProc = new RuntimeProcessImpl(proc);
- final RunnableProcess runnableProcess = new RunnableProcess(rtProc);
- runtimeProcesses.put(proc.getID(), rtProc);
+ // Get the None Start Event if there is one and start the initial flow
+ StartEvent start = getNoneStartEvent(proc);
+ if (start != null)
+ startProcessInternal(start, att);
+ }
+
+ @Override
+ public void startProcess(StartEvent start, Attachments att)
+ {
+ startProcessInternal(start, att);
+ }
+
+ private synchronized void startProcessInternal(StartEvent start, Attachments att)
+ {
+ @SuppressWarnings("serial")
+ class InitialFlow extends SequenceFlowImpl
+ {
+ InitialFlow(StartEvent start)
+ {
+ super(start.getName());
+ setTargetRef(start);
+ }
+ }
- StartCallback startCallback = new StartCallback()
+ Process proc = start.getProcess();
+ ProcessImpl procImpl = (ProcessImpl)proc;
+ RuntimeProcess rtProc = getRuntimeProcess(proc, false);
+ boolean startProcessThread = (rtProc == null);
+
+ // Reset the process if already terminated
+ if (isProcessTerminated(proc))
+ procImpl.resetProcess();
+
+ ProcessStatus procStatus = proc.getProcessStatus();
+ if (procStatus != ProcessStatus.Ready)
+ throw new IllegalStateException("Cannot start process in state: " + procStatus);
+
+ // Register the process if needed
+ ProcessManager pm = ProcessManager.locateProcessManager();
+ if (pm.getProcessByID(proc.getID()) == null)
+ pm.registerProcess(proc);
+
+ rtProc = getRuntimeProcess(proc, true);
+ TokenExecutor tokenExecutor = rtProc.getTokenExecutor();
+ TokenImpl initialToken = new TokenImpl(att);
+ tokenExecutor.create(initialToken, new InitialFlow(start));
+
+ // Start a new process thread
+ if (startProcessThread)
{
- public void start(Token token)
+ RunnableProcess runnable = new RunnableProcess(rtProc);
+ getProcessExecutor().execute(runnable);
+ synchronized (proc)
{
- // Process start time assignments
- startTimeAssignments(proc, token);
+ while (proc.getProcessStatus() != ProcessStatus.Active)
+ {
+ try
+ {
+ log.debug("Wait to become Active " + proc);
+ proc.wait();
+ log.debug("Notified " + proc);
+ }
+ catch (InterruptedException ex)
+ {
+ log.error(ex);
+ }
+ }
+ }
+ }
+
+ // Do the start time assignments
+ startTimeAssignments(proc, initialToken);
+
+ // Start the initial token
+ tokenExecutor.start(initialToken);
+ }
- // Execute the Process through the ExecutorService
- ExecutorService processExecutor = getProcessExecutor();
- log.debug("Execute runnableProcess");
- processExecutor.execute(runnableProcess);
- //new Thread(runnableProcess).start();
+ public ProcessStatus waitForEnd(Process proc)
+ {
+ return waitForEndInternal(proc, 0);
+ }
+ public ProcessStatus waitForEnd(Process proc, long timeout)
+ {
+ return waitForEndInternal(proc, timeout);
+ }
+
+ /**
+ * Wait for the Process to end. All Tokens that are generated at the Start Event for that Process must eventually
+ * arrive at an End Event. The Process will be in a running state until all Tokens are consumed. If the process was
+ * aborted this method throws the causing RuntimeException if avaialable.
+ */
+ private ProcessStatus waitForEndInternal(Process proc, long timeout)
+ {
+ ProcessImpl procImpl = (ProcessImpl)proc;
+
+ ProcessStatus status = proc.getProcessStatus();
+ if (status == ProcessStatus.None)
+ throw new IllegalStateException("Cannot wait for process in state: " + status);
+
+ // Wait a little for the process to end
+ boolean forever = (timeout < 1);
+ long now = System.currentTimeMillis();
+ long until = now + timeout;
+ try
+ {
+ while (forever || now < until)
+ {
synchronized (proc)
{
- // Wait for the process to become active
- while (proc.getProcessStatus() == ProcessStatus.Ready)
+ if (isProcessTerminated(proc))
{
- try
+ if (procImpl.getRuntimeException() != null)
{
- log.debug("Wait for process to become active");
- proc.wait();
- log.debug("Notified: " + proc);
+ throw new BPMException("Process aborted", procImpl.getRuntimeException());
}
- catch (InterruptedException ex)
+ else
{
- log.error(ex);
+ break;
}
}
+
+ // Start waiting to get notified
+ long waitTimeout = forever ? 0 : until - now;
+ log.debug("Wait for " + waitTimeout + "ms on " + proc);
+ proc.wait(waitTimeout);
+ log.debug("Notified " + proc);
}
+ now = System.currentTimeMillis();
}
- };
- rtProc.setTokenExecutor(new TokenExecutorImpl(rtProc, startCallback));
-
- // Get the None Start Event if there is one and start the initial flow
- StartEvent start = getNoneStartEvent(proc);
- if (start != null)
- startProcessInternal(start, att);
+
+ // Throw timeout exception if it took too long
+ if (isProcessTerminated(proc) == false)
+ {
+ RuntimeException rte = new ProcessTimeoutException("Process timeout after " + timeout + "ms for: " + proc.getID());
+ procImpl.setRuntimeException(rte);
+ log.error(rte);
+ throw rte;
+ }
+ }
+ catch (InterruptedException ex)
+ {
+ log.warn(ex);
+ }
+
+ status = proc.getProcessStatus();
+ return status;
}
-
- @Override
- public void startProcess(StartEvent start, Attachments att)
+
+ private boolean isProcessTerminated(Process proc)
{
- startProcessInternal(start, att);
+ ProcessStatus status = proc.getProcessStatus();
+ return status == ProcessStatus.Cancelled || status == ProcessStatus.Completed || status == ProcessStatus.Aborted;
}
-
- private synchronized void startProcessInternal(StartEvent start, Attachments att)
- {
- ObjectName procID = start.getProcess().getID();
- RuntimeProcess rtProc = runtimeProcesses.get(procID);
- TokenExecutor tokenExecutor = rtProc.getTokenExecutor();
- TokenImpl token = new TokenImpl(att);
- tokenExecutor.create(token, new InitialFlow(start));
- tokenExecutor.start(token);
- }
-
+
private StartEvent getNoneStartEvent(Process proc)
{
StartEvent start = null;
@@ -151,6 +245,22 @@
return start;
}
+ private RuntimeProcess getRuntimeProcess(Process proc, boolean createNew)
+ {
+ RuntimeProcess rtProcess;
+ synchronized (runtimeProcesses)
+ {
+ rtProcess = runtimeProcesses.get(proc.getID());
+ if (rtProcess == null && createNew)
+ {
+ TokenExecutorImpl tokenExecutor = new TokenExecutorImpl();
+ rtProcess = new RuntimeProcessImpl(proc, tokenExecutor);
+ runtimeProcesses.put(proc.getID(), rtProcess);
+ }
+ }
+ return rtProcess;
+ }
+
// Evaluate the Start time assignments
private void startTimeAssignments(Process proc, Token token)
{
@@ -169,20 +279,7 @@
}
}
- /**
- * The initial flow to the StartEvent
- */
- @SuppressWarnings("serial")
- class InitialFlow extends SequenceFlowImpl
- {
- InitialFlow(StartEvent start)
- {
- super(start.getName());
- setTargetRef(start);
- }
- }
-
- /**
+ /***************************************************************
* The runnable Process
*/
class RunnableProcess implements Runnable
@@ -213,24 +310,30 @@
procImpl.setProcessStatus(ProcessStatus.Active);
signalManager.throwSignal(proc.getName(), new Signal(proc.getName(), SignalType.SYSTEM_PROCESS_ENTER));
- // Notify that the Process is Active
+ // Notify that the process is now Active
+ log.debug("Notify: " + proc);
proc.notifyAll();
-
+ }
+
+ synchronized (rtProc)
+ {
// Wait until there are no more runnable tokens
while (tokenExecutor.hasRunnableTokens())
{
try
{
- log.debug("Start Waiting");
- proc.wait();
- log.debug("Stop Waiting");
+ log.debug("Wait: " + rtProc);
+ rtProc.wait();
+ log.debug("Notified: " + rtProc);
}
catch (InterruptedException ex)
{
- log.error("Executor thread interrupted");
+ log.error(ex);
}
}
+
log.debug("End execution thread [proc=" + procName + ",status=" + proc.getProcessStatus() + "]");
+ procImpl.setProcessStatus(ProcessStatus.Completed);
}
}
finally
@@ -239,16 +342,13 @@
synchronized (proc)
{
- if (proc.getProcessStatus() == ProcessStatus.Active)
- procImpl.setProcessStatus(ProcessStatus.Completed);
-
+ // Notify that the process has now ended
+ log.debug("Notify: " + proc);
+ proc.notifyAll();
+
ProcessManager procManager = ProcessManager.locateProcessManager();
procManager.unregisterProcess(proc);
runtimeProcesses.remove(procID);
-
- // Notify the Process
- log.debug("Notify Process");
- proc.notifyAll();
}
}
}
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java 2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/ProcessImpl.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -223,7 +223,7 @@
return getID();
}
- private void resetProcess()
+ public void resetProcess()
{
log.debug("Reset process: " + this);
for (FlowObject fo : flowObjects)
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java 2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/RuntimeProcessImpl.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -38,9 +38,10 @@
private Process process;
private TokenExecutor tokenExecutor;
- public RuntimeProcessImpl(Process process)
+ public RuntimeProcessImpl(Process process, TokenExecutor tokenExecutor)
{
this.process = process;
+ this.tokenExecutor = tokenExecutor;
}
public Process getProcess()
@@ -52,9 +53,4 @@
{
return tokenExecutor;
}
-
- public void setTokenExecutor(TokenExecutor tokenExecutor)
- {
- this.tokenExecutor = tokenExecutor;
- }
}
\ No newline at end of file
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java 2008-08-20 19:01:24 UTC (rev 1944)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/ri/model/impl/TokenExecutorImpl.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -34,13 +34,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.bpm.model.ConnectingObject;
+import org.jboss.bpm.model.FlowObject;
import org.jboss.bpm.model.Process;
import org.jboss.bpm.model.SequenceFlow;
import org.jboss.bpm.model.Process.ProcessStatus;
-import org.jboss.bpm.ri.client.RunnableToken;
+import org.jboss.bpm.ri.runtime.DelegatingToken;
import org.jboss.bpm.ri.runtime.MutableToken;
import org.jboss.bpm.ri.runtime.RuntimeProcess;
import org.jboss.bpm.runtime.FlowHandler;
+import org.jboss.bpm.runtime.HandlerSupport;
+import org.jboss.bpm.runtime.SignalHandler;
import org.jboss.bpm.runtime.Token;
import org.jboss.bpm.runtime.TokenExecutor;
import org.jboss.bpm.runtime.Token.TokenStatus;
@@ -61,109 +64,226 @@
{
void start(Token token);
}
-
- private RuntimeProcess rtProc;
+
private ExecutorService executor = Executors.newCachedThreadPool();
private Map<String, RunnableToken> runnableTokens = new HashMap<String, RunnableToken>();
- private StartCallback startCallback;
- public TokenExecutorImpl(RuntimeProcess rtProc, StartCallback startCallback)
- {
- this.rtProc = rtProc;
- this.startCallback = startCallback;
- }
-
public Set<Token> getRunnableTokens()
{
- Set<Token> tokenSet = new HashSet<Token>();
- for (RunnableToken rt : runnableTokens.values())
- tokenSet.add(rt.getToken());
+ synchronized (runnableTokens)
+ {
+ Set<Token> tokenSet = new HashSet<Token>();
+ for (RunnableToken rt : runnableTokens.values())
+ tokenSet.add(rt.getToken());
- return Collections.unmodifiableSet(tokenSet);
+ return Collections.unmodifiableSet(tokenSet);
+ }
}
public boolean hasRunnableTokens()
{
- return runnableTokens.size() > 0;
+ synchronized (runnableTokens)
+ {
+ return runnableTokens.size() > 0;
+ }
}
public void create(Token token, SequenceFlow initialFlow)
{
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setFlow(initialFlow);
- RunnableToken rtToken = new RunnableToken(rtProc, mutableToken);
- runnableTokens.put(token.getTokenID(), rtToken);
+ synchronized (runnableTokens)
+ {
+ MutableToken mutableToken = (MutableToken)token;
+ mutableToken.setFlow(initialFlow);
+ Process proc = initialFlow.getTargetRef().getProcess();
+ RuntimeProcess rtProc = new RuntimeProcessImpl(proc, this);
+ RunnableToken rtToken = new RunnableToken(rtProc, mutableToken);
+ runnableTokens.put(token.getTokenID(), rtToken);
+ }
}
public void start(Token token)
{
- Process proc = rtProc.getProcess();
- ProcessStatus procStatus = proc.getProcessStatus();
- if (procStatus != ProcessStatus.Ready && procStatus != ProcessStatus.Active)
- throw new IllegalStateException("Cannot start token to process in state: " + procStatus);
+ synchronized (runnableTokens)
+ {
+ ProcessStatus procStatus = getProcess(token).getProcessStatus();
+ if (procStatus != ProcessStatus.Ready && procStatus != ProcessStatus.Active)
+ throw new IllegalStateException("Cannot start token to process in state: " + procStatus);
- log.debug("Start Token: " + token);
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setTokenStatus(TokenStatus.Started);
-
- if (startCallback != null)
- {
- log.debug("Call start callback: " + token);
- startCallback.start(token);
- startCallback = null;
+ RunnableToken rtToken = runnableTokens.get(token.getTokenID());
+ executor.submit(rtToken);
}
-
- RunnableToken rtToken = runnableTokens.get(token.getTokenID());
- executor.submit(rtToken);
}
public void move(Token token, SequenceFlow flow)
{
- if (flow == null)
- throw new IllegalArgumentException("Flow cannot be null");
+ synchronized (runnableTokens)
+ {
+ if (flow == null)
+ throw new IllegalArgumentException("Flow cannot be null");
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setFlow(flow);
+ MutableToken mutableToken = (MutableToken)token;
+ mutableToken.setFlow(flow);
+ }
}
public void stop(Token token)
{
- log.debug("Stop Token: " + token);
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setTokenStatus(TokenStatus.Stoped);
+ synchronized (runnableTokens)
+ {
+ log.debug("Stop Token: " + token);
+ MutableToken mutableToken = (MutableToken)token;
+ mutableToken.setTokenStatus(TokenStatus.Stoped);
+ }
}
public void destroy(Token token)
{
- log.debug("Destroy Token: " + token);
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setTokenStatus(TokenStatus.Destroyed);
- runnableTokens.remove(token.getTokenID());
+ synchronized (runnableTokens)
+ {
+ log.debug("Destroy Token: " + token);
+ MutableToken mutableToken = (MutableToken)token;
+ mutableToken.setTokenStatus(TokenStatus.Destroyed);
+ runnableTokens.remove(token.getTokenID());
+ }
}
public String suspend(Token token)
{
- log.debug("Suspend Token: " + token);
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setTokenStatus(TokenStatus.Suspended);
- return token.getTokenID();
+ synchronized (runnableTokens)
+ {
+ log.debug("Suspend Token: " + token);
+ MutableToken mutableToken = (MutableToken)token;
+ mutableToken.setTokenStatus(TokenStatus.Suspended);
+ return token.getTokenID();
+ }
}
public Token activate(String tokenID)
{
- RunnableToken rtToken = runnableTokens.get(tokenID);
- if (rtToken == null)
- throw new IllegalStateException("Not a runnable token: " + tokenID);
+ synchronized (runnableTokens)
+ {
+ RunnableToken rtToken = runnableTokens.get(tokenID);
+ if (rtToken == null)
+ throw new IllegalStateException("Not a runnable token: " + tokenID);
- Token token = rtToken.getToken();
- if (token.getTokenStatus() != TokenStatus.Suspended)
- throw new IllegalStateException("Activate token in state: " + token.getTokenStatus());
+ Token token = rtToken.getToken();
+ if (token.getTokenStatus() != TokenStatus.Suspended)
+ throw new IllegalStateException("Activate token in state: " + token.getTokenStatus());
- log.debug("Activate Token: " + token);
- MutableToken mutableToken = (MutableToken)token;
- mutableToken.setTokenStatus(TokenStatus.Started);
+ log.debug("Activate Token: " + token);
+ MutableToken mutableToken = (MutableToken)token;
+ mutableToken.setTokenStatus(TokenStatus.Started);
- executor.submit(rtToken);
- return token;
+ executor.submit(rtToken);
+ return token;
+ }
}
+
+ private Process getProcess(Token token)
+ {
+ return token.getFlow().getTargetRef().getProcess();
+ }
+
+ /****************************************************
+ * The runnable Token
+ */
+ class RunnableToken implements Runnable
+ {
+ private RuntimeProcess rtProc;
+ private MutableToken token;
+
+ public RunnableToken(RuntimeProcess rtProc, MutableToken token)
+ {
+ this.rtProc = rtProc;
+ this.token = token;
+ }
+
+ public Token getToken()
+ {
+ return token;
+ }
+
+ public void run()
+ {
+ Process proc = rtProc.getProcess();
+ TokenExecutor tokenExecutor = rtProc.getTokenExecutor();
+ try
+ {
+ ConnectingObject flow = token.getFlow();
+ if (flow == null)
+ throw new IllegalStateException("Cannot obtain initial flow");
+
+ TokenStatus tokStatus = token.getTokenStatus();
+ ProcessStatus procStatus = proc.getProcessStatus();
+ while (procStatus == ProcessStatus.Active && tokStatus == TokenStatus.Started)
+ {
+ // Get the target and its handlers
+ FlowObject flowObject = token.getFlow().getTargetRef();
+ FlowObjectImpl flowObjectImpl = (FlowObjectImpl)flowObject;
+ SignalHandler sigHandler = getSignalHandler(flowObject);
+
+ // Synchronize execution on the target FlowObject
+ synchronized (flowObject)
+ {
+ // Throw the Enter Signal
+ sigHandler.throwEnterSignal();
+
+ // Create a Token that includes properties from the current Activity
+ DelegatingToken delegatingToken = new DelegatingToken(token);
+
+ // Execute the target FlowObject
+ flowObjectImpl.execute(delegatingToken);
+
+ // Transfer the token to the FlowHandler
+ flowObjectImpl.executeFlowHandler(tokenExecutor, delegatingToken);
+
+ // Throw the Exit Signal
+ sigHandler.throwExitSignal();
+
+ tokStatus = token.getTokenStatus();
+ procStatus = proc.getProcessStatus();
+ }
+ }
+
+ // Notify Process on token termination
+ terminateToken(proc, tokenExecutor);
+ }
+ catch (RuntimeException rte)
+ {
+ log.error("Process aborted: " + proc, rte);
+ ((ProcessImpl)proc).setRuntimeException(rte);
+
+ // Notify Process on token termination
+ terminateToken(proc, tokenExecutor);
+ }
+ }
+
+ // Remove the token from the list of runnable tokens then notify the process
+ private void terminateToken(Process proc, TokenExecutor tokenExecutor)
+ {
+ tokenExecutor.destroy(token);
+ synchronized (rtProc)
+ {
+ log.debug("Notify: " + rtProc);
+ rtProc.notifyAll();
+ }
+ }
+
+ private SignalHandler getSignalHandler(FlowObject target)
+ {
+ HandlerSupport handlerSupport = getHandlerSupport(target);
+ SignalHandler handler = handlerSupport.getSignalHandler();
+ if (handler == null)
+ throw new IllegalStateException("Cannot obtain signal handler from: " + target);
+
+ return handler;
+ }
+
+ private HandlerSupport getHandlerSupport(FlowObject fo)
+ {
+ if (fo instanceof HandlerSupport == false)
+ throw new IllegalStateException("Flow object does not implement handler support: " + fo);
+ return (HandlerSupport)fo;
+ }
+ }
}
\ No newline at end of file
Added: jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java (rev 0)
+++ jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java 2008-08-20 19:39:22 UTC (rev 1945)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.bpm.runtime;
+
+// $Id$
+
+import org.jboss.bpm.client.ExecutionManager;
+import org.jboss.bpm.model.Process;
+import org.jboss.bpm.model.ProcessBuilder;
+import org.jboss.bpm.model.ProcessBuilderFactory;
+import org.jboss.bpm.model.Process.ProcessStatus;
+import org.jboss.bpm.test.DefaultEngineTestCase;
+
+/**
+ * Test the ExecutionManager
+ *
+ * @author thomas.diesler at jboss.com
+ * @since 08-Jul-2008
+ */
+public class ExecutionManagerTest extends DefaultEngineTestCase
+{
+ public void testSequenceFlow() throws Exception
+ {
+ ProcessBuilder procBuilder = ProcessBuilderFactory.newInstance().newProcessBuilder();
+ procBuilder.addProcess(getName()).addStartEvent("Start").addSequenceFlow("Task");
+ procBuilder.addTask("Task").addSequenceFlow("End").addEndEvent("End");
+ Process proc = procBuilder.getProcess();
+
+ ExecutionManager em = ExecutionManager.locateExecutionManager();
+ em.startProcess(proc, null);
+
+ ProcessStatus status = em.waitForEnd(proc);
+ assertEquals(ProcessStatus.Completed, status);
+ }
+
+ public void testRestartSequenceFlow() throws Exception
+ {
+ ProcessBuilder procBuilder = ProcessBuilderFactory.newInstance().newProcessBuilder();
+ procBuilder.addProcess(getName()).addStartEvent("Start").addSequenceFlow("Task");
+ procBuilder.addTask("Task").addSequenceFlow("End").addEndEvent("End");
+ Process proc = procBuilder.getProcess();
+
+ ExecutionManager em = ExecutionManager.locateExecutionManager();
+ em.startProcess(proc, null);
+
+ ProcessStatus status = em.waitForEnd(proc);
+ assertEquals(ProcessStatus.Completed, status);
+
+ em.startProcess(proc, null);
+
+ status = em.waitForEnd(proc);
+ assertEquals(ProcessStatus.Completed, status);
+ }
+}
Property changes on: jbossbpm/spec/trunk/modules/ri/src/test/java/org/jboss/bpm/runtime/ExecutionManagerTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml (rev 0)
+++ jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml 2008-08-20 19:39:22 UTC (rev 1945)
@@ -0,0 +1,45 @@
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <!-- The KernelLocator -->
+ <bean name="KernelLocator" class="org.jboss.kernel.plugins.util.KernelLocator"/>
+
+ <!-- The Builder Factories -->
+ <bean name="jBPMProcessBuilderFactory" class="org.jboss.bpm.ri.model.impl.ProcessBuilderFactoryImpl"/>
+ <bean name="jBPMMessageBuilderFactory" class="org.jboss.bpm.ri.model.impl.MessageBuilderFactoryImpl"/>
+ <bean name="jBPMPropertyBuilderFactory" class="org.jboss.bpm.ri.model.impl.PropertyBuilderFactoryImpl"/>
+
+ <!-- The ProcessEngine -->
+ <bean name="jBPMProcessEngine" class="org.jboss.bpm.ri.client.ProcessEngineImpl">
+ <property name="processManager"><inject bean="jBPMProcessManager"/></property>
+ <property name="executionManager"><inject bean="jBPMExecutionManager"/></property>
+ <property name="signalManager"><inject bean="jBPMSignalManager"/></property>
+ <property name="messageManager"><inject bean="jBPMMessageManager"/></property>
+ </bean>
+
+ <!-- The Managers -->
+ <bean name="jBPMExecutionManager" class="org.jboss.bpm.ri.client.ExecutionManagerImpl"/>
+ <bean name="jBPMSignalManager" class="org.jboss.bpm.ri.client.SignalManagerImpl"/>
+ <bean name="jBPMMessageManager" class="org.jboss.bpm.ri.client.MessageManagerImpl"/>
+
+ <!-- The ProcessManager -->
+ <bean name="jBPMProcessManager" class="org.jboss.bpm.ri.client.ProcessManagerImpl">
+ <property name="dialectRegistry"><inject bean="jBPMDialectRegistry"/></property>
+ <property name="dialectHandlers">
+ <map keyClass="java.lang.String" valueClass="org.jboss.bpm.client.DialectHandler">
+ </map>
+ </property>
+ </bean>
+
+ <!-- The DialectRegistry -->
+ <bean name="jBPMDialectRegistry" class="org.jboss.bpm.client.DialectRegistry">
+ <property name="registry">
+ <map keyClass="java.lang.String" valueClass="java.lang.String">
+ <entry><key>urn:bpm.jboss:pdl-0.1</key><value>api10</value></entry>
+ <entry><key>urn:jbpm.org:jpdl-3.2</key><value>jpdl32</value></entry>
+ <entry><key>http://stp.eclipse.org/bpmn</key><value>stp</value></entry>
+ <entry><key>http://www.wfmc.org/2008/XPDL2.1</key><value>xpdl21</value></entry>
+ </map>
+ </property>
+ </bean>
+
+</deployment>
\ No newline at end of file
Property changes on: jbossbpm/spec/trunk/modules/ri/src/test/resources/jbpm-beans.xml
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml (rev 0)
+++ jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml 2008-08-20 19:39:22 UTC (rev 1945)
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <!-- ================================= -->
+ <!-- Preserve messages in a local file -->
+ <!-- ================================= -->
+
+ <appender name="FILE" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${log4j.output.dir}/test.log"/>
+ <param name="Append" value="false"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <param name="ConversionPattern" value="%d %-5p [%c:%L] %m%n"/>
+ </layout>
+ </appender>
+
+ <!-- ============================== -->
+ <!-- Append messages to the console -->
+ <!-- ============================== -->
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out" />
+ <param name="Threshold" value="INFO" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{HH:mm:ss,SSS} [%t] %-5p %C{1} : %m%n" />
+ </layout>
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+
+ <category name="org.jbpm">
+ <priority value="DEBUG" />
+ </category>
+
+ <category name="org.hibernate">
+ <priority value="INFO" />
+ </category>
+
+ <!-- hide optimistic locking failures -->
+ <category name="org.hibernate.event.def.AbstractFlushingEventListener">
+ <priority value="FATAL" />
+ </category>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <!--appender-ref ref="CONSOLE"/-->
+ <appender-ref ref="FILE"/>
+ </root>
+
+</log4j:configuration>
Property changes on: jbossbpm/spec/trunk/modules/ri/src/test/resources/log4j.xml
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the jbpm-commits
mailing list