[jbpm-commits] JBoss JBPM SVN: r5080 - in jbpm4/trunk/modules/test-concurrent/src: test/java/org/jbpm/test/concurrent and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jun 22 10:29:53 EDT 2009


Author: jbarrez
Date: 2009-06-22 10:29:53 -0400 (Mon, 22 Jun 2009)
New Revision: 5080

Removed:
   jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java
Modified:
   jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java
   jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java
   jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java
   jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml
Log:
Work in progress: concurrency testing

Modified: jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java	2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/main/java/org/jbpm/test/concurrent/SynchronizableCommandExecutor.java	2009-06-22 14:29:53 UTC (rev 5080)
@@ -80,6 +80,8 @@
     
     private CyclicBarrier afterExecutionBarrier;
     
+    private List<SynchronizableCommandExecutor> executorsSyncedAfterExecution;
+    
     private CyclicBarrier beforeExecutionBarrier;
     
     /**
@@ -187,11 +189,13 @@
       }
       
       if (afterExecutionBarrier == null ) {
-        afterExecutionBarrier = new CyclicBarrier(2);
+        setSyncPointForAfterExecution(new CyclicBarrier(2));
       } else {
-        afterExecutionBarrier = new CyclicBarrier(afterExecutionBarrier.getParties() + 1);
+        setSyncPointForAfterExecution(new CyclicBarrier(afterExecutionBarrier.getParties() + 1));
       }
       
+      setSyncPointForAfterExecution(afterExecutionBarrier);
+      
       return this;
     }
     
@@ -200,6 +204,11 @@
         throw new RuntimeException("Cannot set synchronization point once the thread has been started");
       }
       
+      if (executorsSyncedAfterExecution == null) {
+        executorsSyncedAfterExecution = new ArrayList<SynchronizableCommandExecutor>();
+      }
+      executorsSyncedAfterExecution.add(otherExecutor);
+      
       int threadsInvolved = 0;
       if (afterExecutionBarrier != null) {
         threadsInvolved += afterExecutionBarrier.getParties();
@@ -213,12 +222,21 @@
         threadsInvolved++;
       }
       
-      this.afterExecutionBarrier = new CyclicBarrier(threadsInvolved);
-      otherExecutor.afterExecutionBarrier = this.afterExecutionBarrier;
+      setSyncPointForAfterExecution(new CyclicBarrier(threadsInvolved));
       
       return this; 
     }
     
+    private void setSyncPointForAfterExecution(CyclicBarrier syncpoint) {
+      this.afterExecutionBarrier = syncpoint;
+      if (executorsSyncedAfterExecution == null) {
+        executorsSyncedAfterExecution = new ArrayList<SynchronizableCommandExecutor>();
+      }
+      for (SynchronizableCommandExecutor executor : executorsSyncedAfterExecution) {
+        executor.afterExecutionBarrier = this.afterExecutionBarrier;
+      }
+    }
+    
     public SynchronizableCommandExecutor synchroniseBeforeExecution() {
       if (isAlive()) {
         throw new RuntimeException("Cannot set synchronization point once the JobExecutorEmulator has been started");

Modified: jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java	2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/AsyncForkTest.java	2009-06-22 14:29:53 UTC (rev 5080)
@@ -6,10 +6,24 @@
 import org.jbpm.api.ProcessInstance;
 import org.jbpm.api.job.Job;
 
-
+/**
+ * Concurrency test case: when using an async fork with differen outgoing
+ * transitions, potentially conflicts can occur (eg when the outgoing paths
+ * come together in the Join activity at the same time). 
+ * 
+ * @author jbarrez
+ */
 public class AsyncForkTest extends ConcurrentJbpmTestCase {
   
-  public void testAsyncForkNoOptimisticLockingFailure() {
+  /**
+   * Test case using an async fork with 2 outgoing transactions.
+   * In jBPM3, a StaleStateException was thrown when the different paths
+   * came together in the Join at the same time.
+   * 
+   * However, the jBPM4 Join activity is designed with concurrency in mind,
+   * and the StaleStateException should not occur when this scenario happens.
+   */
+  public void testAsyncForkNoOptimisticLockingFailure()  {
     deployJpdlXmlString(
             "<process name='asyncFork'>" +
             "  <start>" +
@@ -26,12 +40,19 @@
             "  <custom name='pathB' class='org.jbpm.test.concurrent.PassThroughActivity' >" +
             "    <transition to='theJoin' />" +
             "  </custom>" + 
-            "  <join name='theJoin'>" +
-            "    <transition to='test' />" +
+            // Can't test with default lock-mode (upgrade). SELECT ... FOR UPGRADE 
+            // will block transactions at database level with no decent approach 
+            // to check if the thread is blocking. So we use the default lockmode,
+            // which is the standard Hibernate optimistic locking.
+            //
+            // Note: not using lockmode upgrade can cause the Join logic to
+            // work with incorrect data: ie it could be that an incoming 
+            // transition is not seen as the 'last' one, due to a concurrent 
+            // read of data. This siutation is avoided in the test by executing
+            // the Join activity logic of the last transition only after the other one.
+            "  <join name='theJoin' lockmode='none'>" + 
+            "    <transition to='end' />" +
             "  </join>" + 
-            "  <state name='test' >" +
-            "    <transition to='end' /> " +
-            "  </state>" +
             "  <end name='end' />" +
             "</process>"
           );
@@ -40,19 +61,30 @@
     final List<Job> jobs = managementService.createJobQuery().processInstanceId(processInstance.getId()).list();
     assertEquals(2, jobs.size()); 
     
-    SynchronizableCommandExecutor executor1 = new SynchronizableCommandExecutor(environmentFactory, jobs.get(0));
-    SynchronizableCommandExecutor executor2 = new SynchronizableCommandExecutor(environmentFactory, jobs.get(1));
+    SynchronizableCommandExecutor executor1 = startThreadAndSyncAfterExecution(jobs.get(0));
+    SynchronizableCommandExecutor executor2 = startThreadAndSyncAfterExecution(jobs.get(1));
     
-    executor1.synchroniseAfterExecution(executor2);
-    executor1.synchroniseAfterExecution();
-
-    executor1.start();
-    executor2.start();
+    try {
+      executor1.join();
+      executor2.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
     
     if (executor1.getException() instanceof StaleStateException
             || executor2.getException() instanceof StaleStateException) {
       fail("A StaleStaeException was thrown, altough this shouldn't happen");
     }
+    
+    assertProcessInstanceEnded(processInstance);
   }
+  
+  private SynchronizableCommandExecutor startThreadAndSyncAfterExecution(Job job) {
+    SynchronizableCommandExecutor executor = new SynchronizableCommandExecutor(environmentFactory, job);
+    executor.synchroniseAfterExecution();
+    executor.start();
+    executor.waitUntilExecutionFinished(false);
+    return executor;
+  }
 
 }

Deleted: jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java	2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/OptimisticLockTestGround.java	2009-06-22 14:29:53 UTC (rev 5080)
@@ -1,255 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-/**
- * 
- */
-package org.jbpm.test.concurrent;
-
-import java.util.List;
-import java.util.concurrent.Semaphore;
-
-import org.jbpm.api.Execution;
-import org.jbpm.api.ProcessInstance;
-import org.jbpm.api.env.Environment;
-import org.jbpm.api.env.EnvironmentFactory;
-import org.jbpm.api.job.Job;
-import org.jbpm.pvm.internal.cmd.ExecuteJobCmd;
-import org.jbpm.pvm.internal.tx.StandardTransaction;
-import org.jbpm.test.JbpmTestCase;
-
-
-/**
- * Class to test different approaches to the concurrency problem.
- * 
- * Doesnt work anymore, but dont delete yet, I need some stuff in here for later!
- *  * @author Joram Barrez
- */
-public class OptimisticLockTestGround extends JbpmTestCase {
-
-  private EnvironmentFactory environmentFactory;
-
-  protected void setUp() throws Exception {
-    super.setUp();
-    this.environmentFactory = (EnvironmentFactory) processEngine; // Better way to do this?
-  }
-  
-
-  public void testMe() throws Exception {
-    deployJpdlXmlString(
-            "<process name='asyncFork'>" +
-            "  <start>" +
-            "    <transition to='theFork' />" +
-            "  </start>" +
-            "  <fork name='theFork'>" +
-            "    <on event='end' continue='async' />" +
-            "    <transition to='pathA' />" +
-            "    <transition to='pathB' />" +
-            "  </fork>" +
-            "  <custom name='pathA' class='org.jbpm.test.concurrent.PassThroughActivity' >" +
-            "    <transition to='theJoin' />" +
-            "  </custom>" + 
-            "  <custom name='pathB' class='org.jbpm.test.concurrent.PassThroughActivity' >" +
-            "    <transition to='theJoin' />" +
-            "  </custom>" + 
-            "  <join name='theJoin' lockmode='none'>" +
-            "    <transition to='end' />" +
-            "  </join>" + 
-            "  <end name='end' />" +
-            "</process>"
-          );
-    
-    ProcessInstance processInstance = executionService.startProcessInstanceByKey("asyncFork");
-    final List<Job> jobs = managementService.createJobQuery().processInstanceId(processInstance.getId()).list();
-    assertEquals(2, jobs.size()); 
-    
-    JobExecutorEmulator jobExecutorEmulator1 = new JobExecutorEmulator(jobs.get(0));
-    jobExecutorEmulator1.start();
-    
-    JobExecutorEmulator jobExecutorEmulator2 = new JobExecutorEmulator(jobs.get(1));
-    jobExecutorEmulator2.start();
-    
-    // TODO check if transitions are marked for rollback -> exception happened
-    
-    jobExecutorEmulator1.join();
-    if (jobExecutorEmulator1.getException() != null) {
-      fail("Error while executing job: " + jobExecutorEmulator1.getException().getMessage());
-    }
-    
-    jobExecutorEmulator2.join();
-    if (jobExecutorEmulator2.getException() != null) {
-      fail("Error while executing job: " + jobExecutorEmulator2.getException().getMessage());
-    }
-    
-  }
-  
-  public void testMeToo() throws Exception {
-    
-    semaphore = new Semaphore(0);
-    
-    deployJpdlXmlString(
-            "<process name='timer_vs_signal'>" +
-            "  <start>" +
-            "    <transition to='wait' />" +
-            "  </start>" +
-            "  <state name='wait'>" +
-            "    <transition name='timeout' to='end'>" +
-            "      <timer duedate='1 second' />" +
-            "    </transition>" +
-            "    <transition to='end' name='go on' />" +
-            "  </state>" +
-            "  <end name='end' />" +
-            "</process>"
-          );
-    
-    ProcessInstance processInstance = executionService.startProcessInstanceByKey("timer_vs_signal");
-    final List<Job> jobs = managementService.createJobQuery().processInstanceId(processInstance.getId()).list();
-    assertEquals(1, jobs.size()); 
-    
-    JobExecutorEmulator jobExecutorEmulator1 = new JobExecutorEmulator(jobs.get(0));
-    jobExecutorEmulator1.start();
-   
-    while (!semaphore.hasQueuedThreads()) {
-      synchronized (this) {
-        System.out.println("------------------> waiting ...");
-        wait(20L);
-      }
-    }
-    
-    // cause conflict
-    System.out.println("---------------------------> Causing MayHem");
-    
-    Execution executionAtState = processInstance.findActiveExecutionIn("wait");
-    assertNotNull(executionAtState);
-    executionService.signalExecutionById(executionAtState.getId(), "go on");
-    semaphore.release();
-    System.out.println("----------------------------> Released semaphore");
-    
-    // TODO check if transitions are marked for rollback -> exception happened
-    
-    jobExecutorEmulator1.join();
-    if (jobExecutorEmulator1.getException() != null) {
-      fail("Error while executing job: " + jobExecutorEmulator1.getException().getMessage());
-    }
-    
-  }
-  
-  // Todo refactor
-  private static Semaphore semaphore = new Semaphore(1); // binary semaphore
-  
-  
-  private class JobExecutorEmulator extends Thread {
-    
-    private Long jobId;
-    
-    private Exception exception;
-    
-    public JobExecutorEmulator(Job job) {
-      this.jobId = job.getDbid();
-    }
-    
-    public void run() {
-      
-      Environment environment = environmentFactory.openEnvironment();
-      StandardTransaction standardTransaction = environment.get(StandardTransaction.class);
-      standardTransaction.begin();
-
-      try {
-        
-        System.out.println(Thread.currentThread().getName() + "----------------------------------------> executing job " + jobId);
-        ExecuteJobCmd executeJobCmd = new ExecuteJobCmd(jobId);
-        executeJobCmd.execute(environment);
-        
-        System.out.println(Thread.currentThread().getName() +"-------------------------------------------------------> DONE EXECUTING JOB " + jobId);
-        
-      } catch (Exception e) {
-        standardTransaction.setRollbackOnly();
-        this.exception = e;
-        
-        System.out.println(Thread.currentThread().getName() +" -----------------------------------------------------> IM A FAILURE");
-      } finally {
-        
-        
-        // error: both threads entering at the same time!
-        /*
-        boolean acquired = false;
-        System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> BEFORE SYNC BLOCK");
-        synchronized (semaphore) {
-          acquired = semaphore.tryAcquire();
-        }
-        System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> AFTER SYNC BLOCK");
-        if (acquired) {
-          
-          try {
-            System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> ACQUIRING");
-            synchronized (semaphore) {
-              semaphore.wait();
-            }
-            System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> FREED FROM ACQUIRED");
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-        */
-        
-        
-        if (!semaphore.hasQueuedThreads()) {
-          try {
-            System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> ACQUIRING");
-            semaphore.acquire();
-            System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> FREED FROM ACQUIRED");
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-        
-        
-        
-        if (standardTransaction.isRollbackOnly()) {
-          exception = new Exception("Transaction was rollbacked");
-        }
-        
-        System.out.println(Thread.currentThread().getName() + "-------------------------------------------------> COMPLETING ");
-        standardTransaction.complete();
-        
-        System.out.println(Thread.currentThread().getName() + "------------------------------------------------------> RELEASING");
-        
-        /*if (!acquired) {
-          semaphore.release();
-          synchronized (semaphore) {
-            semaphore.notify();
-          }
-        }*/
-        
-      }
-      environment.close();
-      System.out.println(Thread.currentThread().getName() + " has finished");
-    }
-
-    
-    public Exception getException() {
-      return exception;
-    }
-
-
-  }
-  
-}

Modified: jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java	2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/java/org/jbpm/test/concurrent/TimerVsSignalConcurrencyTest.java	2009-06-22 14:29:53 UTC (rev 5080)
@@ -34,11 +34,18 @@
 
 
 /**
+ * Concurrency test case: when a timer is defined on an activity, a job
+ * will be executed by the jobExecutor. But when at the same time a 
+ * signal is done on the same activity, a conflicting situation occurs.
+ * 
+ * This test case will mimic this behaviour to understand how the different
+ * databases react on such a conflict. 
+ * 
  * @author Joram Barrez
  */
 public class TimerVsSignalConcurrencyTest extends ConcurrentJbpmTestCase {
   
-  public void testStaleObjectExceptionThrown() throws Exception {
+  public void testStaleObjectExceptionThrown() {
     
     deployJpdlXmlString(
             "<process name='timer_vs_signal'>" +
@@ -76,13 +83,22 @@
     
     // Best effort: wait 1 sec and see if the staleObjectException has been caused
     synchronized (this) {
-      wait(1000L);
+      try {
+        wait(1000L);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
     }
     
     jobExecutorEmulator.goOn();
     
-    jobExecutorEmulator.join();
-    signalThread.join();
+    try {
+      jobExecutorEmulator.join();
+      signalThread.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    
 
     
     if (!(jobExecutorEmulator.getException() instanceof StaleStateException

Modified: jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml
===================================================================
--- jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml	2009-06-22 14:26:18 UTC (rev 5079)
+++ jbpm4/trunk/modules/test-concurrent/src/test/resources/jbpm.hibernate.cfg.xml	2009-06-22 14:29:53 UTC (rev 5080)
@@ -22,7 +22,7 @@
 	<property name="hibernate.connection.password">sa</property>
 	<property name="hibernate.default_schema">PUBLIC</property>
 	<property name="hibernate.dialect">org.hibernate.dialect.H2Dialect</property>
-
+    
 	
 	<!-- POSTGRES config 
 	<property name="hibernate.connection.driver_class">org.h2.Driver</property>
@@ -30,7 +30,7 @@
 	<property name="hibernate.connection.username">postgres</property>
 	<property name="hibernate.connection.password">postgres</property>
 	<property name="hibernate.dialect">org.hibernate.dialect.PostgreSQLDialect</property> 
-	-->                                          
+	-->                                   
      
     <property name="hibernate.hbm2ddl.auto">create-drop</property>
 	<property name="hibernate.format_sql">true</property>




More information about the jbpm-commits mailing list