[jbpm-commits] JBoss JBPM SVN: r1890 - in jbossbpm/spec/trunk/modules: ri/src/main/java/org/jboss/bpm/client/internal and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Aug 14 10:40:00 EDT 2008
Author: thomas.diesler at jboss.com
Date: 2008-08-14 10:39:59 -0400 (Thu, 14 Aug 2008)
New Revision: 1890
Modified:
jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java
jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java
jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java
jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java
Log:
Fix synchronization issues
Modified: jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java
===================================================================
--- jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/api/src/main/java/org/jboss/bpm/client/ProcessManager.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -41,7 +41,6 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.bpm.EngineShutdownException;
import org.jboss.bpm.model.Process;
-import org.jboss.bpm.model.Process.ProcessStatus;
import org.jboss.bpm.runtime.Attachments;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -159,6 +158,7 @@
*/
public void destroyProcess(Process proc)
{
+ log.debug("destroyProcess: " + proc);
procs.remove(proc.getID());
}
@@ -191,14 +191,6 @@
}
/**
- * Wait for the Process to end. All Tokens that are generated at the Start Event for that Process must eventually
- * arrive at an End Event. The Process will be in a running state until all Tokens are consumed. <p/> If the process
- * was aborted this method throws the causing RuntimeException if avaialable.
- *
- */
- public abstract ProcessStatus waitForEnd(ObjectName procID, long timeout);
-
- /**
* Get the handler for the dialect with the given namespace URI
*/
public DialectHandler getDialectHandler(String nsURI)
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ExecutionManagerImpl.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -136,20 +136,23 @@
boolean hasActiveTokens = tokenExecutor.hasActiveTokens();
try
{
- while (procStatus == ProcessStatus.Active && hasActiveTokens)
+ synchronized (procImpl)
{
- try
+ while (procStatus == ProcessStatus.Active && hasActiveTokens)
{
- Thread.sleep(100);
+ try
+ {
+ procImpl.wait();
+ }
+ catch (InterruptedException ex)
+ {
+ log.error("Executor thread interrupted");
+ }
+ procStatus = procImpl.getProcessStatus();
+ hasActiveTokens = tokenExecutor.hasActiveTokens();
}
- catch (InterruptedException ex)
- {
- log.error("Executor thread interrupted");
- }
- procStatus = procImpl.getProcessStatus();
- hasActiveTokens = tokenExecutor.hasActiveTokens();
+ log.debug("End execution thread [status=" + procStatus + ",tokens=" + tokenExecutor.getActiveTokens() + "]");
}
- log.debug("End execution thread [status=" + procStatus + ",tokens=" + tokenExecutor.getActiveTokens() + "]");
}
finally
{
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/ProcessManagerImpl.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -29,15 +29,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.bpm.BPMException;
-import org.jboss.bpm.ProcessTimeoutException;
import org.jboss.bpm.client.DialectHandler;
import org.jboss.bpm.client.DialectRegistry;
import org.jboss.bpm.client.ExecutionManager;
import org.jboss.bpm.client.ProcessManager;
import org.jboss.bpm.model.Process;
-import org.jboss.bpm.model.Process.ProcessStatus;
-import org.jboss.bpm.model.internal.ProcessImpl;
import org.jboss.bpm.runtime.Attachments;
/**
@@ -75,72 +71,15 @@
exm.startProcess(proc, att);
}
- @Override
- public ProcessStatus waitForEnd(ObjectName procID, long timeout)
- {
- Process proc = getProcessStrict(procID);
- ProcessImpl procImpl = (ProcessImpl)proc;
- ProcessStatus status = proc.getProcessStatus();
-
- if (status == ProcessStatus.None || status == ProcessStatus.Ready)
- throw new IllegalStateException("Cannot wait for process in state: " + status);
-
- // Get the executor thread for interuption
- Thread executorThread = procImpl.getExecutorThread();
-
- // Wait a little for the process to end
- boolean forever = (timeout < 1);
- long now = System.currentTimeMillis();
- long until = now + timeout;
- try
- {
- while (forever || now < until)
- {
- if (status == ProcessStatus.Cancelled || status == ProcessStatus.Completed || status == ProcessStatus.Aborted)
- {
- RuntimeException rte = procImpl.getRuntimeException();
- if (rte != null)
- throw new BPMException("Process aborted", rte);
-
- break;
- }
-
- // Join the executor thread
- executorThread.join(timeout);
- now = System.currentTimeMillis();
- status = proc.getProcessStatus();
- }
-
- // Throw timeout exception if it took too long
- if (status != ProcessStatus.Cancelled && status != ProcessStatus.Completed && status != ProcessStatus.Aborted)
- {
- ProcessTimeoutException rte = new ProcessTimeoutException("Process timeout after " + timeout + "ms for: " + procID);
- log.error(rte);
-
- log.error("Interrupt executor thread");
- executorThread.interrupt();
-
- throw rte;
- }
- }
- catch (InterruptedException ex)
- {
- log.warn(ex);
- }
- finally
- {
- // Destroy the process
- destroyProcess(procImpl);
- }
-
- return status;
- }
-
private Process getProcessStrict(ObjectName procID)
{
Process proc = getProcessByID(procID);
if (proc == null)
- throw new IllegalStateException("Cannot obtain process: " + procID);
+ {
+ IllegalStateException rte = new IllegalStateException("Cannot obtain process: " + procID);
+ log.error(rte);
+ throw rte;
+ }
return proc;
}
}
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/client/internal/RunnableToken.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -110,7 +110,6 @@
{
signalManager.throwSignal(procImpl.getName(), sigHandler.getExitSignal());
}
-
}
tokStatus = token.getTokenStatus();
@@ -125,14 +124,19 @@
catch (RuntimeException rte)
{
log.error("Process aborted: " + procImpl, rte);
- token.setTokenStatus(TokenStatus.Destroyed);
procImpl.setProcessStatus(ProcessStatus.Aborted);
procImpl.setRuntimeException(rte);
}
-// finally
-// {
-// procImpl.notifyAll();
-// }
+ finally
+ {
+
+ token.setTokenStatus(TokenStatus.Destroyed);
+ // Notify the ExecutionManager when a token terminates
+ synchronized (procImpl)
+ {
+ procImpl.notifyAll();
+ }
+ }
}
private SignalHandler getSignalHandler(FlowObject target)
Modified: jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java
===================================================================
--- jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/ri/src/main/java/org/jboss/bpm/model/internal/ProcessImpl.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -35,8 +35,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.bpm.BPMException;
import org.jboss.bpm.InvalidProcessException;
import org.jboss.bpm.NotImplementedException;
+import org.jboss.bpm.ProcessTimeoutException;
import org.jboss.bpm.client.ProcessManager;
import org.jboss.bpm.client.internal.ProcessManagerImpl;
import org.jboss.bpm.model.EndEvent;
@@ -235,16 +237,69 @@
public void waitForEnd()
{
- ProcessManager pm = ProcessManager.locateProcessManager();
- pm.waitForEnd(getID(), 0);
+ waitForEndInternal(0);
}
public void waitForEnd(long timeout)
{
- ProcessManager pm = ProcessManager.locateProcessManager();
- pm.waitForEnd(getID(), timeout);
+ waitForEndInternal(timeout);
}
+
+ /**
+ * Wait for the Process to end. All Tokens that are generated at the Start Event for that Process must eventually
+ * arrive at an End Event. The Process will be in a running state until all Tokens are consumed. If the process
+ * was aborted this method throws the causing RuntimeException if avaialable.
+ */
+ private void waitForEndInternal(long timeout)
+ {
+ if (status == ProcessStatus.None || status == ProcessStatus.Ready)
+ throw new IllegalStateException("Cannot wait for process in state: " + status);
+ // Wait a little for the process to end
+ boolean forever = (timeout < 1);
+ long now = System.currentTimeMillis();
+ long until = now + timeout;
+ try
+ {
+ while (forever || now < until)
+ {
+ if (status == ProcessStatus.Cancelled || status == ProcessStatus.Completed || status == ProcessStatus.Aborted)
+ {
+ if (runtimeException != null)
+ throw new BPMException("Process aborted", runtimeException);
+
+ break;
+ }
+
+ // Join the executor thread
+ executorThread.join(timeout);
+ now = System.currentTimeMillis();
+ }
+
+ // Throw timeout exception if it took too long
+ if (status != ProcessStatus.Cancelled && status != ProcessStatus.Completed && status != ProcessStatus.Aborted)
+ {
+ ProcessTimeoutException rte = new ProcessTimeoutException("Process timeout after " + timeout + "ms for: " + getID());
+ log.error(rte);
+
+ log.error("Interrupt executor thread");
+ executorThread.interrupt();
+
+ throw rte;
+ }
+ }
+ catch (InterruptedException ex)
+ {
+ log.warn(ex);
+ }
+ finally
+ {
+ // Destroy the process
+ ProcessManager procManager = ProcessManager.locateProcessManager();
+ procManager.destroyProcess(this);
+ }
+ }
+
public FlowObject getFlowObject(String name)
{
if (name == null)
Modified: jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java
===================================================================
--- jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/samples/airticket/server/src/test/java/org/jboss/bpm/samples/airticket/ProcessMarshallerTest.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -91,6 +91,7 @@
URL expURL = getResourceURL("samples/airticket/airticket-api10.xml");
ProcessManager pm = ProcessManager.locateProcessManager();
Process proc = pm.createProcess(expURL);
+ pm.destroyProcess(proc);
StringWriter strwr = new StringWriter();
DialectHandler dh = pm.getDialectHandler("urn:bpm.jboss:pdl-0.1");
Modified: jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java
===================================================================
--- jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java 2008-08-14 13:29:28 UTC (rev 1889)
+++ jbossbpm/spec/trunk/modules/testsuite/src/test/java/org/jboss/bpm/cts/process/ProcessStartTest.java 2008-08-14 14:39:59 UTC (rev 1890)
@@ -27,6 +27,8 @@
import javax.management.ObjectName;
+import org.jboss.bpm.client.SignalListener;
+import org.jboss.bpm.client.SignalManager;
import org.jboss.bpm.model.Process;
import org.jboss.bpm.model.ProcessBuilder;
import org.jboss.bpm.model.ProcessBuilderFactory;
@@ -42,6 +44,8 @@
*/
public class ProcessStartTest extends DefaultEngineTestCase
{
+ private RuntimeException startException;
+
public void testStartProcessReturn() throws Exception
{
Process proc = getProcess();
@@ -52,19 +56,34 @@
public void testStartTwice() throws Exception
{
- Process proc = getProcess();
- ObjectName procID = proc.getID();
- assertEquals(procID, proc.startProcess());
- try
+ startException = null;
+
+ final Process proc = getProcess();
+ SignalListener signalListener = new SignalListener()
{
- proc.startProcess();
- fail("Cannot start process twice");
- }
- catch (RuntimeException rte)
- {
- // expected;
- }
+ public void catchSignal(Signal signal)
+ {
+ if (signal.getSignalType() == SignalType.SYSTEM_TASK_ENTER)
+ {
+ try
+ {
+ proc.startProcess();
+ fail("Cannot start process twice");
+ }
+ catch (RuntimeException rte)
+ {
+ startException = rte;
+ }
+ }
+ }
+ };
+ SignalManager sm = SignalManager.locateSignalManager();
+ sm.addSignalListener(getName(), signalListener);
+
+ proc.startProcess();
proc.waitForEnd();
+
+ assertNotNull("Start exception expected", startException);
}
public void testProcessRestart() throws Exception
More information about the jbpm-commits
mailing list