[jbpm-commits] JBoss JBPM SVN: r5080 - in jbpm4/trunk/modules/test-concurrent/src: test/java/org/jbpm/test/concurrent and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jun 22 10:29:53 EDT 2009
Author: jbarrez
Date: 2009-06-22 10:29:53 -0400 (Mon, 22 Jun 2009)
New Revision: 5080
Removed:
jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java
Modified:
jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java
jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java
jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java
jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml
Log:
Work in progress: concurrency testing
Modified: jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java 2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java 2009-06-22 14:29:53 UTC (rev 5080)
@@ -80,6 +80,8 @@
private CyclicBarrier afterExecutionBarrier;
+ private List<SynchronizableCommandExecutor> executorsSyncedAfterExecution;
+
private CyclicBarrier beforeExecutionBarrier;
/**
@@ -187,11 +189,13 @@
}
if (afterExecutionBarrier == null ) {
- afterExecutionBarrier = new CyclicBarrier(2);
+ setSyncPointForAfterExecution(new CyclicBarrier(2));
} else {
- afterExecutionBarrier = new CyclicBarrier(afterExecutionBarrier.getParties() + 1);
+ setSyncPointForAfterExecution(new CyclicBarrier(afterExecutionBarrier.getParties() + 1));
}
+ setSyncPointForAfterExecution(afterExecutionBarrier);
+
return this;
}
@@ -200,6 +204,11 @@
throw new RuntimeException("Cannot set synchronization point once the thread has been started");
}
+ if (executorsSyncedAfterExecution == null) {
+ executorsSyncedAfterExecution = new ArrayList<SynchronizableCommandExecutor>();
+ }
+ executorsSyncedAfterExecution.add(otherExecutor);
+
int threadsInvolved = 0;
if (afterExecutionBarrier != null) {
threadsInvolved += afterExecutionBarrier.getParties();
@@ -213,12 +222,21 @@
threadsInvolved++;
}
- this.afterExecutionBarrier = new CyclicBarrier(threadsInvolved);
- otherExecutor.afterExecutionBarrier = this.afterExecutionBarrier;
+ setSyncPointForAfterExecution(new CyclicBarrier(threadsInvolved));
return this;
}
+ private void setSyncPointForAfterExecution(CyclicBarrier syncpoint) {
+ this.afterExecutionBarrier = syncpoint;
+ if (executorsSyncedAfterExecution == null) {
+ executorsSyncedAfterExecution = new ArrayList<SynchronizableCommandExecutor>();
+ }
+ for (SynchronizableCommandExecutor executor : executorsSyncedAfterExecution) {
+ executor.afterExecutionBarrier = this.afterExecutionBarrier;
+ }
+ }
+
public SynchronizableCommandExecutor synchroniseBeforeExecution() {
if (isAlive()) {
throw new RuntimeException("Cannot set synchronization point once the JobExecutorEmulator has been started");
Modified: jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java 2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java 2009-06-22 14:29:53 UTC (rev 5080)
@@ -6,10 +6,24 @@
import org.jbpm.api.ProcessInstance;
import org.jbpm.api.job.Job;
-
+/**
+ * Concurrency test case: when using an async fork with differen outgoing
+ * transitions, potentially conflicts can occur (eg when the outgoing paths
+ * come together in the Join activity at the same time).
+ *
+ * @author jbarrez
+ */
public class AsyncForkTest extends ConcurrentJbpmTestCase {
- public void testAsyncForkNoOptimisticLockingFailure() {
+ /**
+ * Test case using an async fork with 2 outgoing transactions.
+ * In jBPM3, a StaleStateException was thrown when the different paths
+ * came together in the Join at the same time.
+ *
+ * However, the jBPM4 Join activity is designed with concurrency in mind,
+ * and the StaleStateException should not occur when this scenario happens.
+ */
+ public void testAsyncForkNoOptimisticLockingFailure() {
deployJpdlXmlString(
"<process name='asyncFork'>" +
" <start>" +
@@ -26,12 +40,19 @@
" <custom name='pathB' class='org.jbpm.test.concurrent.PassThroughActivity' >" +
" <transition to='theJoin' />" +
" </custom>" +
- " <join name='theJoin'>" +
- " <transition to='test' />" +
+ // Can't test with default lock-mode (upgrade). SELECT ... FOR UPGRADE
+ // will block transactions at database level with no decent approach
+ // to check if the thread is blocking. So we use the default lockmode,
+ // which is the standard Hibernate optimistic locking.
+ //
+ // Note: not using lockmode upgrade can cause the Join logic to
+ // work with incorrect data: ie it could be that an incoming
+ // transition is not seen as the 'last' one, due to a concurrent
+ // read of data. This siutation is avoided in the test by executing
+ // the Join activity logic of the last transition only after the other one.
+ " <join name='theJoin' lockmode='none'>" +
+ " <transition to='end' />" +
" </join>" +
- " <state name='test' >" +
- " <transition to='end' /> " +
- " </state>" +
" <end name='end' />" +
"</process>"
);
@@ -40,19 +61,30 @@
final List<Job> jobs = managementService.createJobQuery().processInstanceId(processInstance.getId()).list();
assertEquals(2, jobs.size());
- SynchronizableCommandExecutor executor1 = new SynchronizableCommandExecutor(environmentFactory, jobs.get(0));
- SynchronizableCommandExecutor executor2 = new SynchronizableCommandExecutor(environmentFactory, jobs.get(1));
+ SynchronizableCommandExecutor executor1 = startThreadAndSyncAfterExecution(jobs.get(0));
+ SynchronizableCommandExecutor executor2 = startThreadAndSyncAfterExecution(jobs.get(1));
- executor1.synchroniseAfterExecution(executor2);
- executor1.synchroniseAfterExecution();
-
- executor1.start();
- executor2.start();
+ try {
+ executor1.join();
+ executor2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
if (executor1.getException() instanceof StaleStateException
|| executor2.getException() instanceof StaleStateException) {
fail("A StaleStaeException was thrown, altough this shouldn't happen");
}
+
+ assertProcessInstanceEnded(processInstance);
}
+
+ private SynchronizableCommandExecutor startThreadAndSyncAfterExecution(Job job) {
+ SynchronizableCommandExecutor executor = new SynchronizableCommandExecutor(environmentFactory, job);
+ executor.synchroniseAfterExecution();
+ executor.start();
+ executor.waitUntilExecutionFinished(false);
+ return executor;
+ }
}
Deleted: jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java 2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java 2009-06-22 14:29:53 UTC (rev 5080)
@@ -1,255 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-/**
- *
- */
-package org.jbpm.test.concurrent;
-
-import java.util.List;
-import java.util.concurrent.Semaphore;
-
-import org.jbpm.api.Execution;
-import org.jbpm.api.ProcessInstance;
-import org.jbpm.api.env.Environment;
-import org.jbpm.api.env.EnvironmentFactory;
-import org.jbpm.api.job.Job;
-import org.jbpm.pvm.internal.cmd.ExecuteJobCmd;
-import org.jbpm.pvm.internal.tx.StandardTransaction;
-import org.jbpm.test.JbpmTestCase;
-
-
-/**
- * Class to test different approaches to the concurrency problem.
- *
- * Doesnt work anymore, but dont delete yet, I need some stuff in here for later!
- * * @author Joram Barrez
- */
-public class OptimisticLockTestGround extends JbpmTestCase {
-
- private EnvironmentFactory environmentFactory;
-
- protected void setUp() throws Exception {
- super.setUp();
- this.environmentFactory = (EnvironmentFactory) processEngine; // Better way to do this?
- }
-
-
- public void testMe() throws Exception {
- deployJpdlXmlString(
- "<process name='asyncFork'>" +
- " <start>" +
- " <transition to='theFork' />" +
- " </start>" +
- " <fork name='theFork'>" +
- " <on event='end' continue='async' />" +
- " <transition to='pathA' />" +
- " <transition to='pathB' />" +
- " </fork>" +
- " <custom name='pathA' class='org.jbpm.test.concurrent.PassThroughActivity' >" +
- " <transition to='theJoin' />" +
- " </custom>" +
- " <custom name='pathB' class='org.jbpm.test.concurrent.PassThroughActivity' >" +
- " <transition to='theJoin' />" +
- " </custom>" +
- " <join name='theJoin' lockmode='none'>" +
- " <transition to='end' />" +
- " </join>" +
- " <end name='end' />" +
- "</process>"
- );
-
- ProcessInstance processInstance = executionService.startProcessInstanceByKey("asyncFork");
- final List<Job> jobs = managementService.createJobQuery().processInstanceId(processInstance.getId()).list();
- assertEquals(2, jobs.size());
-
- JobExecutorEmulator jobExecutorEmulator1 = new JobExecutorEmulator(jobs.get(0));
- jobExecutorEmulator1.start();
-
- JobExecutorEmulator jobExecutorEmulator2 = new JobExecutorEmulator(jobs.get(1));
- jobExecutorEmulator2.start();
-
- // TODO check if transitions are marked for rollback -> exception happened
-
- jobExecutorEmulator1.join();
- if (jobExecutorEmulator1.getException() != null) {
- fail("Error while executing job: " + jobExecutorEmulator1.getException().getMessage());
- }
-
- jobExecutorEmulator2.join();
- if (jobExecutorEmulator2.getException() != null) {
- fail("Error while executing job: " + jobExecutorEmulator2.getException().getMessage());
- }
-
- }
-
- public void testMeToo() throws Exception {
-
- semaphore = new Semaphore(0);
-
- deployJpdlXmlString(
- "<process name='timer_vs_signal'>" +
- " <start>" +
- " <transition to='wait' />" +
- " </start>" +
- " <state name='wait'>" +
- " <transition name='timeout' to='end'>" +
- " <timer duedate='1 second' />" +
- " </transition>" +
- " <transition to='end' name='go on' />" +
- " </state>" +
- " <end name='end' />" +
- "</process>"
- );
-
- ProcessInstance processInstance = executionService.startProcessInstanceByKey("timer_vs_signal");
- final List<Job> jobs = managementService.createJobQuery().processInstanceId(processInstance.getId()).list();
- assertEquals(1, jobs.size());
-
- JobExecutorEmulator jobExecutorEmulator1 = new JobExecutorEmulator(jobs.get(0));
- jobExecutorEmulator1.start();
-
- while (!semaphore.hasQueuedThreads()) {
- synchronized (this) {
- System.out.println("------------------> waiting ...");
- wait(20L);
- }
- }
-
- // cause conflict
- System.out.println("---------------------------> Causing MayHem");
-
- Execution executionAtState = processInstance.findActiveExecutionIn("wait");
- assertNotNull(executionAtState);
- executionService.signalExecutionById(executionAtState.getId(), "go on");
- semaphore.release();
- System.out.println("----------------------------> Released semaphore");
-
- // TODO check if transitions are marked for rollback -> exception happened
-
- jobExecutorEmulator1.join();
- if (jobExecutorEmulator1.getException() != null) {
- fail("Error while executing job: " + jobExecutorEmulator1.getException().getMessage());
- }
-
- }
-
- // Todo refactor
- private static Semaphore semaphore = new Semaphore(1); // binary semaphore
-
-
- private class JobExecutorEmulator extends Thread {
-
- private Long jobId;
-
- private Exception exception;
-
- public JobExecutorEmulator(Job job) {
- this.jobId = job.getDbid();
- }
-
- public void run() {
-
- Environment environment = environmentFactory.openEnvironment();
- StandardTransaction standardTransaction = environment.get(StandardTransaction.class);
- standardTransaction.begin();
-
- try {
-
- System.out.println(Thread.currentThread().getName() + "----------------------------------------> executing job " + jobId);
- ExecuteJobCmd executeJobCmd = new ExecuteJobCmd(jobId);
- executeJobCmd.execute(environment);
-
- System.out.println(Thread.currentThread().getName() +"-------------------------------------------------------> DONE EXECUTING JOB " + jobId);
-
- } catch (Exception e) {
- standardTransaction.setRollbackOnly();
- this.exception = e;
-
- System.out.println(Thread.currentThread().getName() +" -----------------------------------------------------> IM A FAILURE");
- } finally {
-
-
- // error: both threads entering at the same time!
- /*
- boolean acquired = false;
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> BEFORE SYNC BLOCK");
- synchronized (semaphore) {
- acquired = semaphore.tryAcquire();
- }
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> AFTER SYNC BLOCK");
- if (acquired) {
-
- try {
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> ACQUIRING");
- synchronized (semaphore) {
- semaphore.wait();
- }
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> FREED FROM ACQUIRED");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- */
-
-
- if (!semaphore.hasQueuedThreads()) {
- try {
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> ACQUIRING");
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> FREED FROM ACQUIRED");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
-
-
- if (standardTransaction.isRollbackOnly()) {
- exception = new Exception("Transaction was rollbacked");
- }
-
- System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> COMPLETING ");
- standardTransaction.complete();
-
- System.out.println(Thread.currentThread().getName() + "------------------------------------------------------> RELEASING");
-
- /*if (!acquired) {
- semaphore.release();
- synchronized (semaphore) {
- semaphore.notify();
- }
- }*/
-
- }
- environment.close();
- System.out.println(Thread.currentThread().getName() + " has finished");
- }
-
-
- public Exception getException() {
- return exception;
- }
-
-
- }
-
-}
Modified: jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java 2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java 2009-06-22 14:29:53 UTC (rev 5080)
@@ -34,11 +34,18 @@
/**
+ * Concurrency test case: when a timer is defined on an activity, a job
+ * will be executed by the jobExecutor. But when at the same time a
+ * signal is done on the same activity, a conflicting situation occurs.
+ *
+ * This test case will mimic this behaviour to understand how the different
+ * databases react on such a conflict.
+ *
* @author Joram Barrez
*/
public class TimerVsSignalConcurrencyTest extends ConcurrentJbpmTestCase {
- public void testStaleObjectExceptionThrown() throws Exception {
+ public void testStaleObjectExceptionThrown() {
deployJpdlXmlString(
"<process name='timer_vs_signal'>" +
@@ -76,13 +83,22 @@
// Best effort: wait 1 sec and see if the staleObjectException has been caused
synchronized (this) {
- wait(1000L);
+ try {
+ wait(1000L);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
jobExecutorEmulator.goOn();
- jobExecutorEmulator.join();
- signalThread.join();
+ try {
+ jobExecutorEmulator.join();
+ signalThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
if (!(jobExecutorEmulator.getException() instanceof StaleStateException
Modified: jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml 2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml 2009-06-22 14:29:53 UTC (rev 5080)
@@ -22,7 +22,7 @@
<property name="hibernate.connection.password">sa</property>
<property name="hibernate.default_schema">PUBLIC</property>
<property name="hibernate.dialect">org.hibernate.dialect.H2Dialect</property>
-
+
<!-- POSTGRES config
<property name="hibernate.connection.driver_class">org.h2.Driver</property>
@@ -30,7 +30,7 @@
<property name="hibernate.connection.username">postgres</property>
<property name="hibernate.connection.password">postgres</property>
<property name="hibernate.dialect">org.hibernate.dialect.PostgreSQLDialect</property>
- -->
+ -->
<property name="hibernate.hbm2ddl.auto">create-drop</property>
<property name="hibernate.format_sql">true</property>
More information about the jbpm-commits
mailing list