[jbpm-commits] JBoss JBPM SVN: r3550 - in jbpm4/trunk/modules: pvm and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 26 07:28:52 EST 2008


Author: tom.baeyens at jboss.com
Date: 2008-12-26 07:28:51 -0500 (Fri, 26 Dec 2008)
New Revision: 3550

Added:
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/CompositeCmd.java
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/SendMessageCmd.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveMessagesTest.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveTestCommand.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceMessageTest.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceTestCommand.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingMessageTest.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingTestCommand.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/GenerateExceptionTestCommand.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorTestCase.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageCommand.java
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageTest.java
   jbpm4/trunk/modules/pvm/src/test/resources/jbpm.cfg.xml
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/AddCommentMessage.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/Automatic.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/MessageProcessingTest.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ProcessExecutionTest.java
   jbpm4/trunk/modules/test-load/src/test/resources/jbpm.load.hbm.xml
   jbpm4/trunk/modules/test-load/src/test/resources/org/
   jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/
   jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/
   jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/load/
   jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/load/process.jpdl.xml
Removed:
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ExclusiveTestCommand.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailOnceTestCommand.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailingTestCommand.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/GenerateExceptionTestCommand.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/JobExecutorTest.java
   jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/TestMessageCommand.java
Modified:
   jbpm4/trunk/modules/jpdl/src/main/java/org/jbpm/jpdl/activity/JavaActivity.java
   jbpm4/trunk/modules/pvm/pom.xml
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/job/JobImpl.java
   jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java
   jbpm4/trunk/modules/pvm/src/main/resources/jbpm.pvm.job.hbm.xml
   jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/db/model/CommentDbTest.java
   jbpm4/trunk/modules/pvm/src/test/resources/hibernate.properties
   jbpm4/trunk/modules/pvm/src/test/resources/logging.properties
   jbpm4/trunk/modules/test-load/src/test/resources/jbpm.cfg.xml
   jbpm4/trunk/modules/test-load/src/test/resources/logging.properties
Log:
split job executor tests in load tests and functional tests

Modified: jbpm4/trunk/modules/jpdl/src/main/java/org/jbpm/jpdl/activity/JavaActivity.java
===================================================================
--- jbpm4/trunk/modules/jpdl/src/main/java/org/jbpm/jpdl/activity/JavaActivity.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/jpdl/src/main/java/org/jbpm/jpdl/activity/JavaActivity.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -51,8 +51,13 @@
     Object target = wireContext.create(descriptor, false);
 
     try {
-      List<ArgDescriptor> argDescriptors = invokeOperation.getArgDescriptors();
-      Object[] args = ObjectDescriptor.getArgs(wireContext, argDescriptors);
+      List<ArgDescriptor> argDescriptors = null;
+      Object[] args = null;
+      if (invokeOperation!=null) {
+        argDescriptors = invokeOperation.getArgDescriptors();
+        args = ObjectDescriptor.getArgs(wireContext, argDescriptors);
+      }
+      
       Class<?> clazz = target.getClass();
       Method method = ReflectUtil.findMethod(clazz, methodName, argDescriptors, args);
       if (method==null) {

Modified: jbpm4/trunk/modules/pvm/pom.xml
===================================================================
--- jbpm4/trunk/modules/pvm/pom.xml	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/pom.xml	2008-12-26 12:28:51 UTC (rev 3550)
@@ -141,6 +141,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <excludes>
+            <exclude>**/*TestCase.java</exclude>
             <exclude>**/ExecutionServiceTest.java</exclude>
             <exclude>**/AutomaticDecisionDbTest.java</exclude>
             <exclude>**/PersistentExecutionModeTest.java</exclude>

Added: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/CompositeCmd.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/CompositeCmd.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/CompositeCmd.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.cmd;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+
+
+/** container for executing multiple commands in one transaction. 
+ * 
+ * @author Tom Baeyens
+ */
+public class CompositeCmd implements Command<Void> {
+  
+  private static final long serialVersionUID = 1L;
+
+  protected List<Command<?>> commands = new ArrayList<Command<?>>();
+
+  public Void execute(Environment environment) throws Exception {
+    for (Command command: commands) {
+      command.execute(environment);
+    }
+    return null;
+  }
+
+  public void addCommand(Command<?> command) {
+    commands.add(command);
+  }
+}


Property changes on: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/CompositeCmd.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/SendMessageCmd.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/SendMessageCmd.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/SendMessageCmd.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.cmd;
+
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.job.Message;
+import org.jbpm.session.MessageSession;
+
+/** sends a given message to the configured message service.
+ * 
+ * @author Tom Baeyens
+ */
+public class SendMessageCmd implements Command<Object> {
+  
+  private static final long serialVersionUID = 1L;
+  
+  protected Message message;
+  
+  public SendMessageCmd(Message message) {
+    this.message = message;
+  }
+
+  public Object execute(Environment environment) throws Exception {
+    MessageSession messageSession = environment.get(MessageSession.class);
+    messageSession.send(message);
+    return null;
+  }
+}
\ No newline at end of file


Property changes on: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/cmd/SendMessageCmd.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/job/JobImpl.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/job/JobImpl.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/job/JobImpl.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -17,11 +17,8 @@
   protected int dbversion;
 
   /** date until which the command should not be executed
-   * this date is set to the current time
-   * It should be modified only for timers
-   * Warning: if you modify this, be sure to wake the JobExecutor
-   * when the jobImpl is supposed to be executed */
-  protected Date dueDate = new Date();
+   * for async messages, this dueDate should be set to null. */
+  protected Date dueDate = null;
   
   /** suspended jobs will not execute. */
   protected boolean isSuspended;
@@ -44,6 +41,9 @@
   /** the time the lock on this jobImpl expires. */
   protected Date lockExpirationTime;
 
+  /** any information that subclasses might use in a specific way. */
+  protected String info;
+
   /** stack trace of the exception that occurred during command execution. */
   protected String exception;
   

Modified: jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/src/main/java/org/jbpm/pvm/internal/jobexecutor/JobExceptionHandler.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -62,7 +62,9 @@
     // execute this job exception handler object as a command with 
     // the command service so that this gets done in a separate 
     // transaction
+    log.debug("starting new transaction for handling job exception");
     commandService.execute(this);
+    log.debug("completed transaction for handling job exception");
   }
 
   public Object execute(Environment environment) throws Exception {

Modified: jbpm4/trunk/modules/pvm/src/main/resources/jbpm.pvm.job.hbm.xml
===================================================================
--- jbpm4/trunk/modules/pvm/src/main/resources/jbpm.pvm.job.hbm.xml	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/src/main/resources/jbpm.pvm.job.hbm.xml	2008-12-26 12:28:51 UTC (rev 3550)
@@ -18,6 +18,7 @@
     <property name="isExclusive" column="ISEXCLUSIVE_" />
     <property name="lockOwner" column="LOCKOWNER_" />
     <property name="lockExpirationTime" column="LOCKEXPTIME_" />
+    <property name="info" column="INFO_" />
     <property name="exception" column="EXCEPTION_" type="text" />
     <property name="retries" column="RETRIES_" />
     

Modified: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/db/model/CommentDbTest.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/db/model/CommentDbTest.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/db/model/CommentDbTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -35,14 +35,8 @@
 /**
  * @author Tom Baeyens
  */
-public class CommentDbTest extends EnvironmentDbTestCase
-{
+public class CommentDbTest extends EnvironmentDbTestCase {
 
-  public static Test suite()
-  {
-    return new EnvironmentFactoryTestSetup(CommentDbTest.class);
-  }
-
   public void testComment()
   {
     Date now = new Date();

Added: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveMessagesTest.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveMessagesTest.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveMessagesTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.hibernate.Session;
+import org.jbpm.Execution;
+import org.jbpm.activity.ActivityExecution;
+import org.jbpm.activity.ExternalActivity;
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.pvm.internal.cmd.StartExecutionCmd;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.ProcessDefinitionImpl;
+import org.jbpm.pvm.model.ProcessFactory;
+import org.jbpm.session.MessageSession;
+
+/**
+ * @author Tom Baeyens
+ */
+public class ExclusiveMessagesTest extends JobExecutorTestCase {
+
+  static Map<String, Set<Long>> exclusiveThreadIds;
+
+  static int nbrOfTestMessagesPerExecution = 5;
+  static int nbrOfTestExecutions = 5;
+
+  public void setUp() throws Exception {
+    super.setUp();
+
+    exclusiveThreadIds = new HashMap<String, Set<Long>>();
+  }
+
+  public void testExclusiveMessageProcessing() {
+    insertExclusiveTestMessages();
+
+    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
+    jobExecutor.start();
+    try {
+
+      waitTillNoMoreMessages(jobExecutor);
+
+    } finally {
+      jobExecutor.stop(true);
+    }
+
+    commandService.execute(new Command<Object>() {
+
+      public Object execute(Environment environment) throws Exception {
+        // exclusiveMessageIds maps execution keys to a set of thread ids.
+        // the idea is that for each execution, all the exclusive jobs will
+        // be executed by 1 thread sequentially.
+
+        for (int i = 0; i < nbrOfTestExecutions; i++) {
+          String executionKey = "execution-" + i;
+          Set<Long> threadIds = exclusiveThreadIds.get(executionKey);
+          assertNotNull("no thread id set for " + executionKey + " in: " + exclusiveThreadIds, threadIds);
+          assertEquals("exclusive messages for " + executionKey + " have been executed by multiple threads: " + threadIds, 1, threadIds.size());
+        }
+        return null;
+      }
+    });
+  }
+
+  public static class WaitState implements ExternalActivity {
+
+    private static final long serialVersionUID = 1L;
+
+    public void execute(ActivityExecution execution) throws Exception {
+      execution.waitForSignal();
+    }
+    public void signal(ActivityExecution execution, String signalName, Map<String, Object> parameters) throws Exception {
+      execution.take(signalName);
+    }
+  }
+
+  public void insertExclusiveTestMessages() {
+    commandService.execute(new Command<Object>() {
+      public Object execute(Environment environment) throws Exception {
+        ProcessDefinitionImpl processDefinition = (ProcessDefinitionImpl) ProcessFactory.build("excl")
+          .node("wait").initial().behaviour(WaitState.class)
+        .done();
+        processDefinition.setId("excl:1");
+        
+        Session session = environment.get(Session.class);
+        session.save(processDefinition);
+        return null;
+      }
+    });
+
+    commandService.execute(new Command<Object>() {
+
+      public Object execute(Environment environment) throws Exception {
+        MessageSession messageSession = environment.get(MessageSession.class);
+        for (int i = 0; i < nbrOfTestExecutions; i++) {
+          Execution execution = new StartExecutionCmd("excl:1", null, "execution-" + i).execute(environment);
+
+          for (int j = 0; j < nbrOfTestMessagesPerExecution; j++) {
+            CommandMessage exclusiveTestMessage = ExclusiveTestCommand.createMessage(execution);
+            messageSession.send(exclusiveTestMessage);
+          }
+        }
+        return null;
+      }
+    });
+  }
+
+}


Property changes on: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveMessagesTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Copied: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveTestCommand.java (from rev 3540, jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ExclusiveTestCommand.java)
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveTestCommand.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/ExclusiveTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.hibernate.Session;
+import org.jbpm.Execution;
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.log.Log;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.ExecutionImpl;
+import org.jbpm.pvm.internal.wire.descriptor.LongDescriptor;
+import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class ExclusiveTestCommand implements Command<Object> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Log log = Log.getLog(ExclusiveTestCommand.class.getName());
+  static Random random = new Random();
+  
+  long executionId;
+  
+  public ExclusiveTestCommand() {
+  }
+
+  public static CommandMessage createMessage(Execution execution) {
+    CommandMessage commandMessage = new CommandMessage();
+    commandMessage.setExecution((ExecutionImpl) execution);
+    commandMessage.setExclusive(true);
+    
+    ObjectDescriptor commandDescriptor = new ObjectDescriptor(ExclusiveTestCommand.class);
+    commandDescriptor.addInjection("executionId", new LongDescriptor(execution.getDbid()));
+    commandMessage.setCommandDescriptor(commandDescriptor);
+    return commandMessage;
+  }
+
+  public Object execute(Environment environment) throws Exception {
+    Long threadId = Thread.currentThread().getId();
+    
+    Session session = environment.get(Session.class);
+    ExecutionImpl execution = (ExecutionImpl) session.get(ExecutionImpl.class, executionId);
+    
+    String executionKey = execution.getKey();
+
+    // exclusiveMessageIds maps execution keys to a set of thread ids.
+    // the idea is that for each execution, all the exclusive jobs will 
+    // be executed by 1 thread sequentially.  
+    
+    // in the end, each set should contain exactly 1 element
+    
+    Set<Long> groupMessages = ExclusiveMessagesTest.exclusiveThreadIds.get(executionKey);
+    if (groupMessages==null) {
+      groupMessages = new HashSet<Long>();
+      ExclusiveMessagesTest.exclusiveThreadIds.put(executionKey, groupMessages);
+    }
+    groupMessages.add(threadId);
+    
+    /*
+    // let's assume that an average jobImpl takes between 0 and 150 millis to complete.
+    int workTime = random.nextInt(150);
+    log.debug("executing exclusive message for "+execution+".  this is going to take "+workTime+"ms");
+    try {
+      Thread.sleep(workTime);
+    } catch (RuntimeException e) {
+      log.debug("sleeping was interrupted");
+    }
+    */
+    
+    return null;
+  }
+
+}

Added: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceMessageTest.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceMessageTest.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceMessageTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.hibernate.Session;
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.log.Log;
+import org.jbpm.model.Comment;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.session.MessageSession;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class FailOnceMessageTest extends JobExecutorTestCase {
+
+  private static final Log log = Log.getLog(FailOnceMessageTest.class.getName());
+    
+  static long nbrOfTestMessages = 1;
+  
+  static List<Integer> failOnceMessageIds = Collections.synchronizedList(new ArrayList<Integer>());
+  
+  public void testFailOnceMessages() {
+    failOnceMessageIds.clear();
+
+    jobExecutor.start();
+    try {
+      insertFailOnceTestMessages();
+      waitTillNoMoreMessages(jobExecutor);
+
+    } finally {
+      log.debug("stopping job executor");
+      jobExecutor.stop(true);
+    }
+
+    for (int i = 0; i < nbrOfTestMessages; i++) {
+      assertTrue("message " + i + " is not failed once: " + failOnceMessageIds, failOnceMessageIds.contains(i));
+    }
+    assertEquals(nbrOfTestMessages, failOnceMessageIds.size());
+    
+    log.debug("==== all messages processed, now checking if all messages have arrived exactly once ====");
+
+    commandService.execute(new Command<Object>() {
+
+      public Object execute(Environment environment) throws Exception {
+        Session session = environment.get(Session.class);
+        List<Comment> comments = session.createQuery("from " + CommentImpl.class.getName()).list();
+        
+        for (Comment comment : comments) {
+          log.debug("retrieved message: "+comment.getMessage());
+          Integer messageId = new Integer(comment.getMessage());
+          assertTrue("message " + messageId + " committed twice", failOnceMessageIds.remove(messageId));
+        }
+
+        assertTrue("not all messages made a successful commit: " + failOnceMessageIds, failOnceMessageIds.isEmpty());
+        return null;
+      }
+    });
+  }
+
+  public static class InsertFailOnceTestMsgCmd implements Command<Object> {
+    private static final long serialVersionUID = 1L;
+    int i;
+    public InsertFailOnceTestMsgCmd(int i) {
+      this.i = i;
+    }
+    public Object execute(Environment environment) throws Exception {
+      MessageSession messageSession = environment.get(MessageSession.class);
+      CommandMessage commandMessage = FailOnceTestCommand.createMessage(i);
+      messageSession.send(commandMessage);
+      return null;
+    }
+  }
+
+  void insertFailOnceTestMessages() {
+    for (int i = 0; i < nbrOfTestMessages; i++) {
+      commandService.execute(new InsertFailOnceTestMsgCmd(i));
+    }
+  }
+
+}


Property changes on: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceMessageTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Copied: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceTestCommand.java (from rev 3540, jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailOnceTestCommand.java)
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceTestCommand.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailOnceTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.log.Log;
+import org.jbpm.model.Comment;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.job.JobImpl;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.pvm.internal.wire.descriptor.IntegerDescriptor;
+import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
+import org.jbpm.session.DbSession;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class FailOnceTestCommand implements Command<Object> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Log log = Log.getLog(FailOnceTestCommand.class.getName());
+  
+  int messageId;
+  
+  public FailOnceTestCommand() {
+  }
+
+  public static CommandMessage createMessage(int messageId) {
+    CommandMessage commandMessage = new CommandMessage();
+    ObjectDescriptor commandDescriptor = new ObjectDescriptor(FailOnceTestCommand.class);
+    commandDescriptor.addInjection("messageId", new IntegerDescriptor(messageId));
+    commandMessage.setCommandDescriptor(commandDescriptor);
+    return commandMessage;
+  }
+
+  public Object execute(Environment environment) throws Exception {
+    DbSession dbSession = environment.get(DbSession.class);
+
+    // this message execution should be rolled back
+    Comment comment = new CommentImpl(Integer.toString(messageId));
+    dbSession.save(comment);
+
+    if (!FailOnceMessageTest.failOnceMessageIds.contains(messageId)) {
+      // registering the failed message in a non-transactional resource
+      // so the messageId will still be added even after the transaction has rolled back
+      log.debug("adding failonce message "+messageId);
+      FailOnceMessageTest.failOnceMessageIds.add(messageId);
+      
+      throw new RuntimeException("failing once"); 
+    }
+    
+    try {
+      Thread.sleep(700);
+    } catch (Exception e) {
+      log.error(e.toString());
+    }
+
+    log.debug("message "+messageId+" now succeeds");
+
+    return null;
+  }
+  
+}

Added: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingMessageTest.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingMessageTest.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingMessageTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.List;
+
+import org.hibernate.Session;
+import org.jbpm.cmd.Command;
+import org.jbpm.cmd.CommandService;
+import org.jbpm.env.Environment;
+import org.jbpm.job.Job;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.session.MessageSession;
+import org.jbpm.session.PvmDbSession;
+import org.jbpm.test.DbTestCase;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class FailingMessageTest extends JobExecutorTestCase {
+
+  
+  public void testFailedMessageProcessing() {
+    jobExecutor.start();
+    try {
+      commandService.execute(new Command<Object>() {
+
+        public Object execute(Environment environment) throws Exception {
+          MessageSession messageSession = environment.get(MessageSession.class);
+          CommandMessage commandMessage = FailingTestCommand.createMessage();
+          messageSession.send(commandMessage);
+          return null;
+        }
+      });
+
+      waitTillNoMoreMessages(jobExecutor);
+
+    } finally {
+      jobExecutor.stop(true);
+    }
+
+    commandService.execute(new Command<Object>() {
+
+      public Object execute(Environment environment) throws Exception {
+        PvmDbSession pvmDbSession = environment.get(PvmDbSession.class);
+        List<Job> deadJobs = pvmDbSession.findJobsWithException(0, 10);
+        assertEquals("there should be one dead job", 1, deadJobs.size());
+
+        Session session = environment.get(Session.class);
+        List commands = session.createQuery("from " + CommentImpl.class.getName()).list();
+        assertTrue("command insertion should have been rolled back", commands.isEmpty());
+        return null;
+      }
+    });
+  }
+}


Property changes on: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingMessageTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Copied: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingTestCommand.java (from rev 3540, jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailingTestCommand.java)
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingTestCommand.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/FailingTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.model.Comment;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
+import org.jbpm.session.DbSession;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class FailingTestCommand implements Command<Object> {
+
+  private static final long serialVersionUID = 1L;
+
+  public Object execute(Environment environment) throws Exception {
+    DbSession dbSession = environment.get(DbSession.class);
+    
+    // this message execution should be rolled back
+    Comment comment = new CommentImpl("failing update");
+    dbSession.save(comment);
+    
+    throw new RuntimeException("ooops"); 
+  }
+
+  public static CommandMessage createMessage() {
+    CommandMessage commandMessage = new CommandMessage();
+    ObjectDescriptor commandDescriptor = new ObjectDescriptor(FailingTestCommand.class);
+    commandMessage.setCommandDescriptor(commandDescriptor);
+    return commandMessage;
+  }
+
+}

Copied: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/GenerateExceptionTestCommand.java (from rev 3540, jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/GenerateExceptionTestCommand.java)
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/GenerateExceptionTestCommand.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/GenerateExceptionTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.wire.descriptor.IntegerDescriptor;
+import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
+
+
+/**
+ * @author Tom Baeyens
+ * @author Guillaume Porcher
+ * 
+ * Simple command that will create an exception during execution. 
+ * The exception will generate a stacktrace with variable length 
+ * (controlled by the length parameter).
+ * 
+ * This class is to test the persistence of exception stacktrace in jobs.
+ */
+public class GenerateExceptionTestCommand implements Command<Object> {
+
+  private static final long serialVersionUID = 1L;
+  
+  int length;
+  
+  public GenerateExceptionTestCommand() {
+  }
+  
+  public static CommandMessage createMessage(int recursionInitialDepth) {
+    CommandMessage commandMessage = new CommandMessage();
+    ObjectDescriptor commandDescriptor = new ObjectDescriptor(GenerateExceptionTestCommand.class);
+    commandDescriptor.addInjection("length", new IntegerDescriptor(recursionInitialDepth));
+    commandMessage.setCommandDescriptor(commandDescriptor);
+    return commandMessage;
+  }
+
+  public Object execute(Environment environment) throws Exception {
+    StringBuilder stringBuilder = new StringBuilder();
+    while (stringBuilder.length() < length) {
+      stringBuilder.append("This is a long test message. ");
+    }
+    throw new RuntimeException(stringBuilder.toString());
+  }
+
+}

Added: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorTestCase.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorTestCase.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorTestCase.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.hibernate.Query;
+import org.hibernate.Session;
+import org.jbpm.cmd.Command;
+import org.jbpm.cmd.CommandService;
+import org.jbpm.env.Environment;
+import org.jbpm.pvm.internal.job.JobImpl;
+import org.jbpm.test.DbTestCase;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class JobExecutorTestCase extends DbTestCase {
+
+  static long timeoutMillis = 20 * 1000; // 10 seconds
+  static long checkInterval = 200;
+
+  static String jobsAvailableQueryText =
+      "select count(*) "+
+      "from "+JobImpl.class.getName()+" as job "+
+      "where ( (job.dueDate is null) or (job.dueDate <= :now) ) "+ 
+      "  and ( job.retries > 0 )";
+
+  protected CommandService commandService;
+  protected JobExecutor jobExecutor;
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    
+    commandService = processEngine.get(CommandService.class);
+    jobExecutor = processEngine.get(JobExecutor.class);
+  }
+  
+  protected void waitTillNoMoreMessages(JobExecutor jobExecutor) {
+
+    // install a timer that will interrupt if it takes too long
+    // if that happens, it will lead to an interrupted exception and the test
+    // will fail
+    TimerTask interruptTask = new TimerTask() {
+
+      Thread testThread = Thread.currentThread();
+
+      public void run() {
+        log.debug("test " + getName() + " took too long. going to interrupt..." + testThread);
+        testThread.interrupt();
+      }
+    };
+    Timer timer = new Timer();
+    timer.schedule(interruptTask, timeoutMillis);
+
+    try {
+      boolean jobsAvailable = true;
+      while (jobsAvailable) {
+        log.debug("going to sleep for " + checkInterval + " millis, waiting for the job executor to process more jobs");
+        Thread.sleep(checkInterval);
+        jobsAvailable = areJobsAvailable();
+      }
+
+    } catch (InterruptedException e) {
+      fail("test execution exceeded treshold of " + timeoutMillis + " milliseconds");
+    } finally {
+      timer.cancel();
+    }
+  }
+
+  public boolean areJobsAvailable() {
+    return commandService.execute(new Command<Boolean>() {
+      public Boolean execute(Environment environment) {
+        Session session = environment.get(Session.class);
+
+        Query query = session.createQuery(jobsAvailableQueryText);
+        query.setDate("now", new Date());
+        
+        Long jobs = (Long) query.uniqueResult();
+
+        if (jobs.longValue()>0) {
+          log.debug("found "+jobs+" more jobs to process");
+          return true;
+        }
+        log.debug("no more jobs to process");
+        
+        return false;
+      }
+    });
+  }
+
+}


Property changes on: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/JobExecutorTestCase.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageCommand.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageCommand.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.Random;
+
+import org.hibernate.Session;
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.pvm.internal.wire.descriptor.IntegerDescriptor;
+import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
+
+/**
+ * @author Tom Baeyens
+ */
+public class NormalMessageCommand implements Command<Void>  {
+  
+  private static final long serialVersionUID = 1L;
+  static Random random = new Random();
+  
+  int messageId;
+  
+  public NormalMessageCommand() {
+  }
+  
+  public NormalMessageCommand(int messageId) {
+    this.messageId = messageId;
+  }
+
+  public static CommandMessage createMessage(int messageId) {
+    CommandMessage commandMessage = new CommandMessage();
+    ObjectDescriptor commandDescriptor = new ObjectDescriptor(NormalMessageCommand.class);
+    commandDescriptor.addInjection("messageId", new IntegerDescriptor(messageId));
+    commandMessage.setCommandDescriptor(commandDescriptor);
+    return commandMessage;
+  }
+
+  public Void execute(Environment environment) throws Exception {
+    CommentImpl comment = new CommentImpl(Integer.toString(messageId));
+    Session session = environment.get(Session.class);
+    session.save(comment);
+    return null;
+  }
+}


Property changes on: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageCommand.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageTest.java
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageTest.java	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,103 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.pvm.internal.jobexecutor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hibernate.Session;
+import org.jbpm.cmd.Command;
+import org.jbpm.env.Environment;
+import org.jbpm.model.Comment;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.session.MessageSession;
+import org.jbpm.test.Db;
+
+/**
+ * @author Tom Baeyens
+ */
+public class NormalMessageTest extends JobExecutorTestCase {
+
+  static long nbrOfTestMessages = 5;   
+
+  static long timeoutMillis = 30 * 1000; // 30 seconds
+  static long checkInterval = 400;
+
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+    Db.clean(processEngine);
+  }
+
+  public void testNormalMessageProcessing() {
+    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
+    jobExecutor.start();
+
+    try {
+      // insert ${nbrOfTestMessages} messages... 
+      for (int i = 0; i < nbrOfTestMessages; i++) {
+        commandService.execute(new InsertNormalMessageCmd(i));
+      }
+
+      // wait till all messages are processed
+      waitTillNoMoreMessages(jobExecutor);
+
+    } finally {
+      jobExecutor.stop(true);
+    }
+    
+    List<Integer> processedMessageNumbers = commandService.execute(new Command<List<Integer>>() {
+      public List<Integer> execute(Environment environment) {
+        List<Integer> processedMessageNumbers = new ArrayList<Integer>();
+        Session session = environment.get(Session.class);
+        List<Comment> comments = session.createCriteria(CommentImpl.class).list();
+        for (Comment comment: comments) {
+          int processedMessageNumber = Integer.parseInt(comment.getMessage());
+          processedMessageNumbers.add(processedMessageNumber);
+          // make sure the db stays clean
+          session.delete(comment);
+        }
+        return processedMessageNumbers;
+      }
+    });
+    
+    for (int i = 0; i < nbrOfTestMessages; i++) {
+      assertTrue("message " + i + " is not processed: " + processedMessageNumbers, processedMessageNumbers.contains(i));
+    }
+  }
+  
+  public static class InsertNormalMessageCmd implements Command<Object> {
+    private static final long serialVersionUID = 1L;
+    int i;
+    public InsertNormalMessageCmd(int i) {
+      this.i = i;
+    }
+    public Object execute(Environment environment) throws Exception {
+      MessageSession messageSession = environment.get(MessageSession.class);
+      CommandMessage commandMessage = NormalMessageCommand.createMessage(i);
+      messageSession.send(commandMessage);
+      return null;
+    }
+  }
+
+}


Property changes on: jbpm4/trunk/modules/pvm/src/test/java/org/jbpm/pvm/internal/jobexecutor/NormalMessageTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: jbpm4/trunk/modules/pvm/src/test/resources/hibernate.properties
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/resources/hibernate.properties	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/src/test/resources/hibernate.properties	2008-12-26 12:28:51 UTC (rev 3550)
@@ -7,5 +7,5 @@
 hibernate.cache.use_second_level_cache true
 hibernate.cache.provider_class         org.hibernate.cache.HashtableCacheProvider
 # hibernate.show_sql                     true
-hibernate.format_sql                   true
-hibernate.use_sql_comments             true
+# hibernate.format_sql                   true
+# hibernate.use_sql_comments             true

Added: jbpm4/trunk/modules/pvm/src/test/resources/jbpm.cfg.xml
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/resources/jbpm.cfg.xml	                        (rev 0)
+++ jbpm4/trunk/modules/pvm/src/test/resources/jbpm.cfg.xml	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<jbpm-configuration xmlns="http://jbpm.org/xsd/cfg">
+
+  <process-engine>
+  
+    <deployer-manager>
+      <assign-file-type>
+        <file extension=".jpdl.xml" type="jpdl" />
+      </assign-file-type>
+      <check-process />
+      <check-problems />
+      <save />
+    </deployer-manager>
+    
+    <process-service />
+    <execution-service />
+    <management-service />
+  
+    <command-service>
+      <retry-interceptor />
+      <environment-interceptor />
+      <standard-transaction-interceptor />
+    </command-service>
+    
+    <hibernate-configuration>
+      <properties resource="hibernate.properties" />
+      <mapping resource="jbpm.pvm.typedefs.hbm.xml" />
+      <mapping resource="jbpm.pvm.wire.hbm.xml" />
+      <mapping resource="jbpm.pvm.definition.hbm.xml" />
+      <mapping resource="jbpm.pvm.execution.hbm.xml" />
+      <mapping resource="jbpm.pvm.variable.hbm.xml" />
+      <mapping resource="jbpm.pvm.job.hbm.xml" />
+      <cache-configuration resource="jbpm.pvm.cache.xml" 
+                           usage="nonstrict-read-write" />
+    </hibernate-configuration>
+    
+    <hibernate-session-factory />
+    
+    <job-executor auto-start="false" />
+    <job-test-helper />
+
+    <id-generator />
+    <variable-types resource="jbpm.pvm.types.xml" />
+
+    <business-calendar>
+      <monday    hours="9:00-12:00 and 12:30-17:00"/>
+      <tuesday   hours="9:00-12:00 and 12:30-17:00"/>
+      <wednesday hours="9:00-12:00 and 12:30-17:00"/>
+      <thursday  hours="9:00-12:00 and 12:30-17:00"/>
+      <friday    hours="9:00-12:00 and 12:30-17:00"/>
+      <holiday period="01/07/2008 - 31/08/2008"/>
+    </business-calendar>
+  
+  </process-engine>
+
+  <environment>
+    <hibernate-session />
+    <transaction />
+    <pvm-db-session />
+    <job-db-session />
+    <message-session />
+    <timer-session />
+  </environment>
+
+</jbpm-configuration>


Property changes on: jbpm4/trunk/modules/pvm/src/test/resources/jbpm.cfg.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: jbpm4/trunk/modules/pvm/src/test/resources/logging.properties
===================================================================
--- jbpm4/trunk/modules/pvm/src/test/resources/logging.properties	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/pvm/src/test/resources/logging.properties	2008-12-26 12:28:51 UTC (rev 3550)
@@ -4,7 +4,7 @@
 
 redirect.commons.logging = enabled
 
-java.util.logging.ConsoleHandler.level = FINEST
+java.util.logging.ConsoleHandler.level = ERROR
 java.util.logging.ConsoleHandler.formatter = org.jbpm.log.LogFormatter
 
 # org.jbpm.util.ErrorTriggeredFileHandler.size = 500
@@ -14,12 +14,12 @@
 # For example, set the com.xyz.foo logger to only log SEVERE messages:
 # com.xyz.foo.level = SEVERE
 
-org.jbpm.level=FINEST
-org.jbpm.pvm.internal.tx.level=FINE
-org.jbpm.pvm.internal.wire.level=FINE
-org.jbpm.pvm.internal.util.level=FINE
+org.jbpm.level=INFO
+# org.jbpm.pvm.internal.tx.level=FINE
+# org.jbpm.pvm.internal.wire.level=FINE
+# org.jbpm.pvm.internal.util.level=FINE
 
-org.hibernate.cfg.HbmBinder.level=SEVERE
+org.hibernate.level=INFO
 org.hibernate.cfg.SettingsFactory.level=SEVERE
 # org.hibernate.level=FINE
 # org.hibernate.SQL.level=FINEST

Copied: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/AddCommentMessage.java (from rev 3540, jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/TestMessageCommand.java)
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/AddCommentMessage.java	                        (rev 0)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/AddCommentMessage.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.test.load;
+
+import java.util.Random;
+
+import org.hibernate.Session;
+import org.jbpm.env.Environment;
+import org.jbpm.log.Log;
+import org.jbpm.pvm.internal.job.MessageImpl;
+import org.jbpm.pvm.internal.model.CommentImpl;
+
+/**
+ * @author Tom Baeyens
+ */
+public class AddCommentMessage extends MessageImpl<Void>  {
+  
+  private static final long serialVersionUID = 1L;
+  static Random random = new Random();
+  
+  public AddCommentMessage() {
+  }
+
+  public AddCommentMessage(String message) {
+    this.info = message;
+  }
+
+  public Void execute(Environment environment) throws Exception {
+    CommentImpl comment = new CommentImpl(info);
+    Session session = environment.get(Session.class);
+    session.save(comment);
+    session.delete(this);
+    MessageProcessingTest.commentAdded();
+    return null;
+  }
+}

Added: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/Automatic.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/Automatic.java	                        (rev 0)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/Automatic.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.test.load;
+
+/**
+ * @author Tom Baeyens
+ */
+public class Automatic {
+
+  
+  public void doNothing() {
+  }
+}


Property changes on: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/Automatic.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Deleted: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ExclusiveTestCommand.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ExclusiveTestCommand.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ExclusiveTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,96 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.test.load;
-
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-
-import org.hibernate.Session;
-import org.jbpm.Execution;
-import org.jbpm.cmd.Command;
-import org.jbpm.env.Environment;
-import org.jbpm.log.Log;
-import org.jbpm.pvm.internal.job.CommandMessage;
-import org.jbpm.pvm.internal.model.ExecutionImpl;
-import org.jbpm.pvm.internal.wire.descriptor.LongDescriptor;
-import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
-
-
-/**
- * @author Tom Baeyens
- */
-public class ExclusiveTestCommand implements Command<Object> {
-
-  private static final long serialVersionUID = 1L;
-  private static final Log log = Log.getLog(ExclusiveTestCommand.class.getName());
-  static Random random = new Random();
-  
-  long executionId;
-  
-  public ExclusiveTestCommand() {
-  }
-
-  public static CommandMessage createMessage(Execution execution) {
-    CommandMessage commandMessage = new CommandMessage();
-    commandMessage.setExecution((ExecutionImpl) execution);
-    commandMessage.setExclusive(true);
-    
-    ObjectDescriptor commandDescriptor = new ObjectDescriptor(ExclusiveTestCommand.class);
-    commandDescriptor.addInjection("executionId", new LongDescriptor(execution.getDbid()));
-    commandMessage.setCommandDescriptor(commandDescriptor);
-    return commandMessage;
-  }
-
-  public Object execute(Environment environment) throws Exception {
-    Long threadId = Thread.currentThread().getId();
-    
-    Session session = environment.get(Session.class);
-    ExecutionImpl execution = (ExecutionImpl) session.get(ExecutionImpl.class, executionId);
-    
-    String executionKey = execution.getKey();
-
-    // exclusiveMessageIds maps execution keys to a set of thread ids.
-    // the idea is that for each execution, all the exclusive jobs will 
-    // be executed by 1 thread sequentially.  
-    
-    // in the end, each set should contain exactly 1 element  
-    Set<Long> groupMessages = JobExecutorTest.exclusiveThreadIds.get(executionKey);
-    if (groupMessages==null) {
-      groupMessages = new HashSet<Long>();
-      JobExecutorTest.exclusiveThreadIds.put(executionKey, groupMessages);
-    }
-    groupMessages.add(threadId);
-    
-    // let's assume that an average jobImpl takes between 0 and 150 millis to complete.
-    int workTime = random.nextInt(150);
-    log.debug("executing exclusive message for "+execution+".  this is going to take "+workTime+"ms");
-    try {
-      Thread.sleep(workTime);
-    } catch (RuntimeException e) {
-      log.debug("sleeping was interrupted");
-    }
-    
-    return null;
-  }
-
-}

Deleted: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailOnceTestCommand.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailOnceTestCommand.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailOnceTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.test.load;
-
-import org.jbpm.cmd.Command;
-import org.jbpm.env.Environment;
-import org.jbpm.log.Log;
-import org.jbpm.model.Comment;
-import org.jbpm.pvm.internal.job.CommandMessage;
-import org.jbpm.pvm.internal.job.MessageImpl;
-import org.jbpm.pvm.internal.model.CommentImpl;
-import org.jbpm.pvm.internal.wire.descriptor.IntegerDescriptor;
-import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
-import org.jbpm.session.DbSession;
-
-
-/**
- * @author Tom Baeyens
- */
-public class FailOnceTestCommand implements Command<Object> {
-
-  private static final long serialVersionUID = 1L;
-  private static final Log log = Log.getLog(FailOnceTestCommand.class.getName());
-  
-  int messageId;
-  
-  public FailOnceTestCommand() {
-  }
-
-  public static CommandMessage createMessage(int messageId) {
-    CommandMessage commandMessage = new CommandMessage();
-    ObjectDescriptor commandDescriptor = new ObjectDescriptor(FailOnceTestCommand.class);
-    commandDescriptor.addInjection("messageId", new IntegerDescriptor(messageId));
-    commandMessage.setCommandDescriptor(commandDescriptor);
-    return commandMessage;
-  }
-
-  public Object execute(Environment environment) throws Exception {
-    DbSession dbSession = environment.get(DbSession.class);
-
-    // this message execution should be rolled back
-    Comment comment = new CommentImpl(Integer.toString(messageId));
-    dbSession.save(comment);
-
-    if (!JobExecutorTest.failOnceMessageIds.contains(messageId)) {
-      // registering the failed message in a non-transactional resource
-      // so the messageId will still be added even after the transaction has rolled back
-      log.debug("adding failonce message "+messageId);
-      JobExecutorTest.failOnceMessageIds.add(messageId);
-      
-      throw new RuntimeException("failing once"); 
-    }
-
-    log.debug("message "+messageId+" now succeeds");
-
-    return null;
-  }
-
-}

Deleted: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailingTestCommand.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailingTestCommand.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/FailingTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.test.load;
-
-import org.jbpm.cmd.Command;
-import org.jbpm.env.Environment;
-import org.jbpm.model.Comment;
-import org.jbpm.pvm.internal.job.CommandMessage;
-import org.jbpm.pvm.internal.model.CommentImpl;
-import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
-import org.jbpm.session.DbSession;
-
-
-/**
- * @author Tom Baeyens
- */
-public class FailingTestCommand implements Command<Object> {
-
-  private static final long serialVersionUID = 1L;
-
-  public Object execute(Environment environment) throws Exception {
-    DbSession dbSession = environment.get(DbSession.class);
-    
-    // this message execution should be rolled back
-    Comment comment = new CommentImpl("failing update");
-    dbSession.save(comment);
-    
-    throw new RuntimeException("ooops"); 
-  }
-
-  public static CommandMessage createMessage() {
-    CommandMessage commandMessage = new CommandMessage();
-    ObjectDescriptor commandDescriptor = new ObjectDescriptor(FailingTestCommand.class);
-    commandMessage.setCommandDescriptor(commandDescriptor);
-    return commandMessage;
-  }
-
-}

Deleted: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/GenerateExceptionTestCommand.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/GenerateExceptionTestCommand.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/GenerateExceptionTestCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,66 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.test.load;
-
-import org.jbpm.cmd.Command;
-import org.jbpm.env.Environment;
-import org.jbpm.pvm.internal.job.CommandMessage;
-import org.jbpm.pvm.internal.wire.descriptor.IntegerDescriptor;
-import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
-
-
-/**
- * @author Tom Baeyens
- * @author Guillaume Porcher
- * 
- * Simple command that will create an exception during execution. 
- * The exception will generate a stacktrace with variable length 
- * (controlled by the length parameter).
- * 
- * This class is to test the persistence of exception stacktrace in jobs.
- */
-public class GenerateExceptionTestCommand implements Command<Object> {
-
-  private static final long serialVersionUID = 1L;
-  
-  int length;
-  
-  public GenerateExceptionTestCommand() {
-  }
-  
-  public static CommandMessage createMessage(int recursionInitialDepth) {
-    CommandMessage commandMessage = new CommandMessage();
-    ObjectDescriptor commandDescriptor = new ObjectDescriptor(GenerateExceptionTestCommand.class);
-    commandDescriptor.addInjection("length", new IntegerDescriptor(recursionInitialDepth));
-    commandMessage.setCommandDescriptor(commandDescriptor);
-    return commandMessage;
-  }
-
-  public Object execute(Environment environment) throws Exception {
-    StringBuilder stringBuilder = new StringBuilder();
-    while (stringBuilder.length() < length) {
-      stringBuilder.append("This is a long test message. ");
-    }
-    throw new RuntimeException(stringBuilder.toString());
-  }
-
-}

Deleted: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/JobExecutorTest.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/JobExecutorTest.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/JobExecutorTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,402 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.test.load;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.hibernate.Session;
-import org.jbpm.Execution;
-import org.jbpm.activity.ActivityExecution;
-import org.jbpm.activity.ExternalActivity;
-import org.jbpm.cmd.Command;
-import org.jbpm.cmd.CommandService;
-import org.jbpm.env.Environment;
-import org.jbpm.log.Log;
-import org.jbpm.model.Comment;
-import org.jbpm.pvm.internal.cmd.StartExecutionCmd;
-import org.jbpm.pvm.internal.job.CommandMessage;
-import org.jbpm.pvm.internal.job.JobImpl;
-import org.jbpm.pvm.internal.jobexecutor.JobDbSession;
-import org.jbpm.pvm.internal.jobexecutor.JobExecutor;
-import org.jbpm.pvm.internal.model.CommentImpl;
-import org.jbpm.session.MessageSession;
-import org.jbpm.test.DbTestCase;
-
-/**
- * @author Tom Baeyens
- * @author Guillaume Porcher
- */
-public class JobExecutorTest extends DbTestCase {
-
-  private static final Log log = Log.getLog(JobExecutorTest.class.getName());
-
-  static List<Integer> processedMessageIds;
-  static Map<String, Set<Long>> exclusiveThreadIds;
-  static List<Integer> failOnceMessageIds;
-
-  static int nbrOfTestMessages = 1000;
-  static int timeoutMillis = 2 * 60 * 1000; // 10 minutes
-  static int checkInterval = 400;
-  static int nbrOfTestMessagesPerExecution = 5;
-  static int nbrOfTestExecutions = 5;
-  
-  CommandService commandService;
-
-  public void setUp() throws Exception {
-    super.setUp();
-    processedMessageIds = new ArrayList<Integer>();
-    exclusiveThreadIds = new HashMap<String, Set<Long>>();
-    failOnceMessageIds = new ArrayList<Integer>();
-    
-    commandService = processEngine.get(CommandService.class);
-  }
-
-  public void testSuccessfulMessageProcessing() {
-    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
-    jobExecutor.start();
-    try {
-      insertTestMessages();
-      waitTillNoMoreMessages(jobExecutor);
-
-    } finally {
-      jobExecutor.stop(true);
-    }
-
-    List<Integer> processedMessageNumbers = commandService.execute(new Command<List<Integer>>() {
-      public List<Integer> execute(Environment environment) {
-        List<Integer> processedMessageNumbers = new ArrayList<Integer>();
-        Session session = environment.get(Session.class);
-        List<Comment> comments = session.createCriteria(CommentImpl.class).list();
-        for (Comment comment: comments) {
-          int processedMessageNumber = Integer.parseInt(comment.getMessage());
-          processedMessageNumbers.add(processedMessageNumber);
-          // make sure the db stays clean
-          session.delete(comment);
-        }
-        return processedMessageNumbers;
-      }
-    });
-    
-    for (int i = 0; i < nbrOfTestMessages; i++) {
-      assertTrue("message " + i + " is not processed: " + processedMessageNumbers, processedMessageNumbers.contains(i));
-    }
-  }
-
-  /*
-  public void testMessagesPresentUponJobExecutorStartUp() {
-
-    insertTestMessages();
-
-    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
-    jobExecutor.start();
-    try {
-      waitTillNoMoreMessages(jobExecutor);
-
-    } finally {
-      jobExecutor.stop(true);
-    }
-
-    for (int i = 0; i < nbrOfTestMessages; i++) {
-      assertTrue("message " + i + " is not processed", processedMessageIds.contains(i));
-    }
-  }
-
-  public void testExclusiveMessageProcessing() {
-    insertExclusiveTestMessages();
-
-    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
-    jobExecutor.start();
-    try {
-
-      waitTillNoMoreMessages(jobExecutor);
-
-    } finally {
-      jobExecutor.stop(true);
-    }
-
-    commandService.execute(new Command<Object>() {
-
-      public Object execute(Environment environment) throws Exception {
-        // exclusiveMessageIds maps execution keys to a set of thread ids.
-        // the idea is that for each execution, all the exclusive jobs will
-        // be executed by 1 thread sequentially.
-
-        for (int i = 0; i < nbrOfTestExecutions; i++) {
-          String executionKey = "execution-" + i;
-          Set<Long> threadIds = exclusiveThreadIds.get(executionKey);
-          assertNotNull("no thread id set for " + executionKey + " in: " + exclusiveThreadIds, threadIds);
-          assertEquals("exclusive messages for " + executionKey + " have been executed by multiple threads: " + threadIds, 1, threadIds.size());
-        }
-        return null;
-      }
-    });
-  }
-
-  public void testFailOnceMessages() {
-    failOnceMessageIds.clear();
-
-    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
-    jobExecutor.start();
-    try {
-      insertFailOnceTestMessages();
-      waitTillNoMoreMessages(jobExecutor);
-
-    } finally {
-      jobExecutor.stop(true);
-    }
-
-    for (int i = 0; i < nbrOfTestMessages; i++) {
-      assertTrue("message " + i + " is not failed once: " + failOnceMessageIds, failOnceMessageIds.contains(i));
-    }
-    assertEquals(nbrOfTestMessages, failOnceMessageIds.size());
-    
-    log.debug("==== all messages processed, now checking if all messages have arrived exactly once ====");
-
-    commandService.execute(new Command<Object>() {
-
-      public Object execute(Environment environment) throws Exception {
-        Session session = environment.get(Session.class);
-        List<Comment> comments = session.createQuery("from " + CommentImpl.class.getName()).list();
-        
-        for (Comment comment : comments) {
-          log.debug("retrieved message: "+comment.getMessage());
-          Integer messageId = new Integer(comment.getMessage());
-          assertTrue("message " + messageId + " committed twice", failOnceMessageIds.remove(messageId));
-        }
-
-        assertTrue("not all messages made a successful commit: " + failOnceMessageIds, failOnceMessageIds.isEmpty());
-        return null;
-      }
-    });
-  }
-
-  public void testFailedMessageProcessing() {
-    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
-    jobExecutor.start();
-    try {
-      commandService.execute(new Command<Object>() {
-
-        public Object execute(Environment environment) throws Exception {
-          MessageSession messageSession = environment.get(MessageSession.class);
-          CommandMessage commandMessage = FailingTestCommand.createMessage();
-          messageSession.send(commandMessage);
-          return null;
-        }
-      });
-
-      waitTillNoMoreMessages(jobExecutor);
-
-    } finally {
-      jobExecutor.stop(true);
-    }
-
-    commandService.execute(new Command<Object>() {
-
-      public Object execute(Environment environment) throws Exception {
-        PvmDbSession pvmDbSession = environment.get(PvmDbSession.class);
-        List<Job> deadJobs = pvmDbSession.findJobsWithException(0, 10);
-        assertEquals("there should be one dead job", 1, deadJobs.size());
-
-        Session session = environment.get(Session.class);
-        List commands = session.createQuery("from " + CommentImpl.class.getName()).list();
-        assertTrue("command insertion should have been rolled back", commands.isEmpty());
-        return null;
-      }
-    });
-  }
-
-  public void testExceptionInJob() {
-
-    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
-    jobExecutor.start();
-    try {
-      commandService.execute(new Command<Object>() {
-
-        // size of the error message to generate
-        // (currently there is a limit of 4000 characters)
-        int msgLength = 4100;
-
-        public Object execute(Environment environment) throws Exception {
-          MessageSession messageSession = environment.get(MessageSession.class);
-          CommandMessage commandMessage = GenerateExceptionTestCommand.createMessage(msgLength);
-          messageSession.send(commandMessage);
-          return null;
-        }
-      });
-
-      waitTillNoMoreMessages(jobExecutor);
-
-    } finally {
-      jobExecutor.stop(true);
-    }
-
-    commandService.execute(new Command<Object>() {
-
-      public Object execute(Environment environment) throws Exception {
-        PvmDbSession pvmDbSession = environment.get(PvmDbSession.class);
-        List<Job> deadJobs = pvmDbSession.findJobsWithException(0, 10);
-        assertEquals("there should be one dead jobImpl", 1, deadJobs.size());
-        return null;
-      }
-    });
-  }
-  */
-
-  // helper methods ///////////////////////////////////////////////////////////
-
-  public static class InsertMessageCmd implements Command<Object> {
-
-    private static final long serialVersionUID = 1L;
-    int i;
-
-    public InsertMessageCmd(int i) {
-      this.i = i;
-    }
-    public Object execute(Environment environment) throws Exception {
-      MessageSession messageSession = environment.get(MessageSession.class);
-      CommandMessage commandMessage = TestMessageCommand.createMessage(i);
-      messageSession.send(commandMessage);
-      return null;
-    }
-  }
-
-  void insertTestMessages() {
-    for (int i = 0; i < nbrOfTestMessages; i++) {
-      commandService.execute(new InsertMessageCmd(i));
-    }
-  }
-
-  public static class WaitState implements ExternalActivity {
-
-    private static final long serialVersionUID = 1L;
-
-    public void execute(ActivityExecution execution) throws Exception {
-      execution.waitForSignal();
-    }
-    public void signal(ActivityExecution execution, String signalName, Map<String, Object> parameters) throws Exception {
-      execution.take(signalName);
-    }
-  }
-
-  void insertExclusiveTestMessages() {
-    deployJpdlXmlString(
-      "<process name='excl'>" +
-      "  <start>" +
-      "    <flow to='a' />" +
-      "  </start>" +
-      "  <state name='a' />" +
-      "</process>"
-    );
-    
-    commandService.execute(new Command<Object>() {
-      public Object execute(Environment environment) throws Exception {
-        MessageSession messageSession = environment.get(MessageSession.class);
-        for (int i = 0; i < nbrOfTestExecutions; i++) {
-          Execution execution = 
-            new StartExecutionCmd("excl:1", null, "execution-" + i)
-              .execute(environment);
-
-          for (int j = 0; j < nbrOfTestMessagesPerExecution; j++) {
-            CommandMessage exclusiveTestMessage = ExclusiveTestCommand.createMessage(execution);
-            messageSession.send(exclusiveTestMessage);
-          }
-        }
-        return null;
-      }
-    });
-  }
-
-  public static class InsertFailOnceTestMsgCmd implements Command<Object> {
-
-    private static final long serialVersionUID = 1L;
-    int i;
-
-    public InsertFailOnceTestMsgCmd(int i) {
-      this.i = i;
-    }
-    public Object execute(Environment environment) throws Exception {
-      MessageSession messageSession = environment.get(MessageSession.class);
-      CommandMessage commandMessage = FailOnceTestCommand.createMessage(i);
-      messageSession.send(commandMessage);
-      return null;
-    }
-  }
-
-  void insertFailOnceTestMessages() {
-    for (int i = 0; i < nbrOfTestMessages; i++) {
-      commandService.execute(new InsertFailOnceTestMsgCmd(i));
-    }
-  }
-
-  private void waitTillNoMoreMessages(JobExecutor jobExecutor) {
-
-    // install a timer that will interrupt if it takes too long
-    // if that happens, it will lead to an interrupted exception and the test
-    // will fail
-    TimerTask interruptTask = new TimerTask() {
-
-      Thread testThread = Thread.currentThread();
-
-      public void run() {
-        log.debug("test " + getName() + " took too long. going to interrupt..." + testThread);
-        testThread.interrupt();
-      }
-    };
-    Timer timer = new Timer();
-    timer.schedule(interruptTask, timeoutMillis);
-
-    try {
-      boolean jobsAvailable = true;
-      while (jobsAvailable) {
-        log.debug("going to sleep for " + checkInterval + " millis, waiting for the jobImpl executor to process more jobs");
-        Thread.sleep(checkInterval);
-        jobsAvailable = areJobsAvailable();
-      }
-
-    } catch (InterruptedException e) {
-      fail("test execution exceeded treshold of " + timeoutMillis + " milliseconds");
-    } finally {
-      timer.cancel();
-    }
-  }
-
-  boolean areJobsAvailable() {
-    return commandService.execute(new Command<Boolean>() {
-      public Boolean execute(Environment environment) {
-        Session session = environment.get(Session.class);
-
-        Long jobs = (Long) session.createQuery("select count(*) from "+JobImpl.class.getName()).uniqueResult();
-        if (jobs.longValue()>0) {
-          log.debug("found more jobs to process");
-          return true;
-        }
-        return false;
-      }
-    });
-  }
-}

Copied: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/MessageProcessingTest.java (from rev 3540, jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/JobExecutorTest.java)
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/MessageProcessingTest.java	                        (rev 0)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/MessageProcessingTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,280 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.test.load;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.hibernate.Session;
+import org.jbpm.activity.ActivityExecution;
+import org.jbpm.activity.ExternalActivity;
+import org.jbpm.cmd.Command;
+import org.jbpm.cmd.CommandService;
+import org.jbpm.env.Environment;
+import org.jbpm.job.Message;
+import org.jbpm.log.Log;
+import org.jbpm.model.Comment;
+import org.jbpm.pvm.internal.cmd.CompositeCmd;
+import org.jbpm.pvm.internal.cmd.SendMessageCmd;
+import org.jbpm.pvm.internal.job.CommandMessage;
+import org.jbpm.pvm.internal.job.JobImpl;
+import org.jbpm.pvm.internal.jobexecutor.FailOnceTestCommand;
+import org.jbpm.pvm.internal.jobexecutor.JobExecutor;
+import org.jbpm.pvm.internal.model.CommentImpl;
+import org.jbpm.session.MessageSession;
+import org.jbpm.test.DbTestCase;
+
+/**
+ * @author Tom Baeyens
+ * @author Guillaume Porcher
+ */
+public class MessageProcessingTest extends DbTestCase {
+
+  private static final Log log = Log.getLog(MessageProcessingTest.class.getName());
+
+  static final long SECOND = 1000;  
+  static final long MINUTE = 60 * SECOND;  
+  static final long HOUR = 60 * MINUTE;  
+    
+  static long nbrOfTestMessages = 10000; // nbrOfTestMessages must be dividable by insertGroupSize  
+  static long insertGroupSize = 10;
+
+  static boolean measureMemory = true;
+
+  static long timeoutMillis = 30 * MINUTE; // 10 minutes
+  static long checkInterval = 5000;
+
+  static long start = -1;
+  static int commentsAdded = 0;
+  
+  public static synchronized void commentAdded() {
+    commentsAdded++;
+  }
+
+  CommandService commandService;
+  PrintWriter memoryWriter;
+
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    if ( (nbrOfTestMessages % insertGroupSize) != 0 ) {
+      fail("nbrOfTestMessages ("+nbrOfTestMessages+") is not dividable by insertGroupSize ("+insertGroupSize+")");
+    }
+    
+    if (measureMemory) {
+      openMemoryLogFile();
+    }
+    
+    commandService = processEngine.get(CommandService.class);
+  }
+  
+  protected void tearDown() throws Exception {
+    if (measureMemory) {
+      closeMemoryLogFile();
+    }
+    
+    super.tearDown();
+  }
+
+
+  void openMemoryLogFile() throws Exception {
+    memoryWriter = new PrintWriter(new File("target/memory.txt"));
+  }
+
+  void closeMemoryLogFile() {
+    memoryWriter.close();
+  }
+
+  void logStatus() {
+    Runtime runtime = Runtime.getRuntime();
+    long total = runtime.totalMemory();
+    long free = runtime.freeMemory();
+    long used = total - free;
+    memoryWriter.println(used);
+    
+    log.info(commentsAdded+" msgs in "+formatDuration(start, System.currentTimeMillis())+" mem:"+used);
+  }
+
+  public void testSuccessfulMessageProcessing() {
+    JobExecutor jobExecutor = processEngine.get(JobExecutor.class);
+    
+    long stop = -1;
+    
+    try {
+      int messageIndex = 0;
+      // insert ${nbrOfTestMessages} messages... 
+      for (int i = 0; i < nbrOfTestMessages; i+=insertGroupSize) {
+        CompositeCmd compositeCmd = new CompositeCmd();
+        // ...in groups of ${insertGroupSize}
+        for (int j = 0; j <insertGroupSize ; j++) {
+          Message message = new AddCommentMessage(Integer.toString(messageIndex));
+          SendMessageCmd sendMessageCmd = new SendMessageCmd(message);
+          compositeCmd.addCommand(sendMessageCmd);
+          
+          messageIndex++;
+        }
+        commandService.execute(compositeCmd);
+        
+        log.info("added "+messageIndex+" messages");
+      }
+
+      start = System.currentTimeMillis();
+      
+      jobExecutor.start();
+
+      // wait till all messages are processed
+      waitTillNoMoreMessages(jobExecutor);
+
+    } finally {
+      stop = System.currentTimeMillis();
+      
+      jobExecutor.stop(true);
+    }
+    
+    log.info("processing "+nbrOfTestMessages+" messages took "+formatDuration(start, stop));
+
+    List<Integer> processedMessageNumbers = commandService.execute(new Command<List<Integer>>() {
+      public List<Integer> execute(Environment environment) {
+        List<Integer> processedMessageNumbers = new ArrayList<Integer>();
+        Session session = environment.get(Session.class);
+        List<Comment> comments = session.createCriteria(CommentImpl.class).list();
+        for (Comment comment: comments) {
+          int processedMessageNumber = Integer.parseInt(comment.getMessage());
+          processedMessageNumbers.add(processedMessageNumber);
+          // make sure the db stays clean
+          session.delete(comment);
+        }
+        return processedMessageNumbers;
+      }
+    });
+    
+    for (int i = 0; i < nbrOfTestMessages; i++) {
+      assertTrue("message " + i + " is not processed: " + processedMessageNumbers, processedMessageNumbers.contains(i));
+    }
+  }
+
+  // helper methods ///////////////////////////////////////////////////////////
+
+  public static String formatDuration(long start, long stop) {
+    long diff = stop-start;
+    
+    long hours = diff / HOUR;
+    diff = diff - (hours * HOUR);
+    
+    long minutes = diff / MINUTE;
+    diff = diff - (minutes * MINUTE);
+    
+    long seconds = diff / SECOND;
+    diff = diff - (seconds * SECOND);
+    
+    StringBuffer text = new StringBuffer();
+    if (hours!=0) {
+      text.append(hours+" hours, ");
+    }
+    if (minutes!=0) {
+      text.append(minutes+" minutes, ");
+    }
+    if (seconds!=0) {
+      text.append(seconds+" seconds and ");
+    }
+    if (diff!=0) {
+      text.append(diff+" ms");
+    }
+    return text.toString();
+  }
+
+
+  public static class InsertFailOnceTestMsgCmd implements Command<Object> {
+
+    private static final long serialVersionUID = 1L;
+    int i;
+
+    public InsertFailOnceTestMsgCmd(int i) {
+      this.i = i;
+    }
+    public Object execute(Environment environment) throws Exception {
+      MessageSession messageSession = environment.get(MessageSession.class);
+      CommandMessage commandMessage = FailOnceTestCommand.createMessage(i);
+      messageSession.send(commandMessage);
+      return null;
+    }
+  }
+
+  void insertFailOnceTestMessages() {
+    for (int i = 0; i < nbrOfTestMessages; i++) {
+      commandService.execute(new InsertFailOnceTestMsgCmd(i));
+    }
+  }
+
+  private void waitTillNoMoreMessages(JobExecutor jobExecutor) {
+
+    // install a timer that will interrupt if it takes too long
+    // if that happens, it will lead to an interrupted exception and the test
+    // will fail
+    TimerTask interruptTask = new TimerTask() {
+
+      Thread testThread = Thread.currentThread();
+
+      public void run() {
+        log.debug("test " + getName() + " took too long. going to interrupt..." + testThread);
+        testThread.interrupt();
+      }
+    };
+    Timer timer = new Timer();
+    timer.schedule(interruptTask, timeoutMillis);
+
+    try {
+      boolean jobsAvailable = true;
+      while (jobsAvailable) {
+        log.debug("going to sleep for " + checkInterval + " millis, waiting for the job executor to process more jobs");
+        Thread.sleep(checkInterval);
+        jobsAvailable = areJobsAvailable();
+        logStatus();
+      }
+
+    } catch (InterruptedException e) {
+      fail("test execution exceeded treshold of " + timeoutMillis + " milliseconds");
+    } finally {
+      timer.cancel();
+    }
+  }
+
+  boolean areJobsAvailable() {
+    return commandService.execute(new Command<Boolean>() {
+      public Boolean execute(Environment environment) {
+        Session session = environment.get(Session.class);
+
+        Long jobs = (Long) session.createQuery("select count(*) from "+JobImpl.class.getName()).uniqueResult();
+        if (jobs.longValue()>0) {
+          log.debug("found "+jobs+" more jobs to process");
+          return true;
+        }
+        return false;
+      }
+    });
+  }
+}

Added: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ProcessExecutionTest.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ProcessExecutionTest.java	                        (rev 0)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ProcessExecutionTest.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.test.load;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jbpm.Execution;
+import org.jbpm.log.Log;
+import org.jbpm.test.DbTestCase;
+
+
+/**
+ * @author Tom Baeyens
+ */
+public class ProcessExecutionTest extends DbTestCase {
+  
+  private static Log log = Log.getLog(ProcessExecutionTest.class.getName());
+  
+  static int processes = 2;
+  static int threads = 2;
+  
+  
+  Throwable exception;
+
+  public void testExecuteProcesses() throws Exception {
+    deployJpdlResource("org/jbpm/test/load/process.jpdl.xml");
+    
+    List<Thread> threadList = new ArrayList<Thread>();
+    for (int i=0; i<threads; i++) {
+      Thread thread = new ProcessExecutor();
+      thread.start();
+      threadList.add(thread);
+    }
+    
+    for (Thread thread: threadList) {
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        log.info(e.toString());
+      }
+      if (exception!=null) {
+        break;
+      }
+    }
+    
+    if (exception!=null) {
+      throw new Exception("thread threw: "+exception.getMessage(), exception);
+    }
+  }
+  
+  public class ProcessExecutor extends Thread {
+    public void run() {
+      try {
+        for (int i=0; i<processes; i++) {
+          executeProcess();
+        }
+      } catch (Throwable t) {
+        exception = t;
+      }
+    }
+    public void executeProcess() {
+      Execution execution = executionService.startExecutionByKey("Process");
+      assertEquals("c", execution.getNodeName());
+      String executionId = execution.getId();
+      execution = executionService.signalExecutionById(executionId);
+      assertTrue(execution.isEnded());
+    }
+  }
+
+
+}


Property changes on: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/ProcessExecutionTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Deleted: jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/TestMessageCommand.java
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/TestMessageCommand.java	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/java/org/jbpm/test/load/TestMessageCommand.java	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,79 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.test.load;
-
-import java.util.Random;
-
-import org.hibernate.Session;
-import org.jbpm.cmd.Command;
-import org.jbpm.env.Environment;
-import org.jbpm.log.Log;
-import org.jbpm.pvm.internal.job.CommandMessage;
-import org.jbpm.pvm.internal.model.CommentImpl;
-import org.jbpm.pvm.internal.wire.descriptor.IntegerDescriptor;
-import org.jbpm.pvm.internal.wire.descriptor.ObjectDescriptor;
-
-/**
- * @author Tom Baeyens
- */
-public class TestMessageCommand implements Command<Object> {
-  
-  private static final long serialVersionUID = 1L;
-  private static final Log log = Log.getLog(TestMessageCommand.class.getName());
-  static Random random = new Random();
-  
-  int messageId;
-  
-  public TestMessageCommand() {
-  }
-
-  public TestMessageCommand(int messageId) {
-    this.messageId = messageId;
-  }
-
-  public Object execute(Environment environment) throws Exception {
-    String msgText = Integer.toString(messageId);
-    CommentImpl comment = new CommentImpl(msgText);
-    environment.get(Session.class).save(comment);
-    
-    /*
-    // let's assume that an average job takes between 0 and 150 millis to complete.
-    int workTime = random.nextInt(150);
-    log.debug("executing test message "+messageId+".  this is going to take "+workTime+"ms");
-    try {
-      Thread.sleep(workTime);
-    } catch (RuntimeException e) {
-      log.debug("sleeping was interrupted");
-    }
-    */
-    
-    return null;
-  }
-
-  public static CommandMessage createMessage(int i) {
-    CommandMessage commandMessage = new CommandMessage();
-    ObjectDescriptor commandDescriptor = new ObjectDescriptor(TestMessageCommand.class);
-    commandDescriptor.addInjection("messageId", new IntegerDescriptor(i));
-    commandMessage.setCommandDescriptor(commandDescriptor);
-    return commandMessage;
-  }
-}

Modified: jbpm4/trunk/modules/test-load/src/test/resources/jbpm.cfg.xml
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/resources/jbpm.cfg.xml	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/resources/jbpm.cfg.xml	2008-12-26 12:28:51 UTC (rev 3550)
@@ -33,6 +33,7 @@
       <mapping resource="jbpm.pvm.variable.hbm.xml" />
       <mapping resource="jbpm.pvm.job.hbm.xml" />
       <mapping resource="jbpm.jpdl.hbm.xml" />
+      <mapping resource="jbpm.load.hbm.xml" />
       <cache-configuration resource="jbpm.pvm.cache.xml" 
                            usage="nonstrict-read-write" />
     </hibernate-configuration>

Added: jbpm4/trunk/modules/test-load/src/test/resources/jbpm.load.hbm.xml
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/resources/jbpm.load.hbm.xml	                        (rev 0)
+++ jbpm4/trunk/modules/test-load/src/test/resources/jbpm.load.hbm.xml	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,14 @@
+<?xml version="1.0"?>
+
+<!DOCTYPE hibernate-mapping PUBLIC
+      "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
+      "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">
+
+<hibernate-mapping default-access="field">
+
+	<subclass name="org.jbpm.test.load.AddCommentMessage" 
+         extends="org.jbpm.pvm.internal.job.MessageImpl" 
+         discriminator-value="AddMsg">
+  </subclass>
+  
+</hibernate-mapping>
\ No newline at end of file


Property changes on: jbpm4/trunk/modules/test-load/src/test/resources/jbpm.load.hbm.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: jbpm4/trunk/modules/test-load/src/test/resources/logging.properties
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/resources/logging.properties	2008-12-24 22:03:07 UTC (rev 3549)
+++ jbpm4/trunk/modules/test-load/src/test/resources/logging.properties	2008-12-26 12:28:51 UTC (rev 3550)
@@ -1,11 +1,11 @@
-handlers= java.util.logging.ConsoleHandler
+handlers=java.util.logging.ConsoleHandler
 # to add the error triggered file handler
 # handlers= java.util.logging.ConsoleHandler org.jbpm.util.ErrorTriggeredFileHandler
 
-redirect.commons.logging = enabled
+redirect.commons.logging=enabled
 
-java.util.logging.ConsoleHandler.level = ERROR
-java.util.logging.ConsoleHandler.formatter = org.jbpm.log.LogFormatter
+java.util.logging.ConsoleHandler.level=FINEST
+java.util.logging.ConsoleHandler.formatter=org.jbpm.log.LogFormatter
 
 # org.jbpm.util.ErrorTriggeredFileHandler.size = 500
 # org.jbpm.util.ErrorTriggeredFileHandler.push = OFF
@@ -14,11 +14,15 @@
 # For example, set the com.xyz.foo logger to only log SEVERE messages:
 # com.xyz.foo.level = SEVERE
 
-org.jbpm.level=FINEST
-org.jbpm.pvm.internal.tx.level=FINE
-org.jbpm.pvm.internal.wire.level=FINE
-org.jbpm.pvm.internal.util.level=FINE
+org.jbpm.level=INFO
+org.jbpm.pvm.internal.model.level=FINE
+org.jbpm.pvm.internal.model.op.level=FINE
+# org.jbpm.pvm.internal.tx.level=FINE
+# org.jbpm.pvm.internal.wire.level=FINE
+# org.jbpm.pvm.internal.util.level=FINE
 
+org.jbpm.test.load.level=INFO
+
 org.hibernate.cfg.HbmBinder.level=SEVERE
 org.hibernate.cfg.SettingsFactory.level=SEVERE
 # org.hibernate.level=FINE

Added: jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/load/process.jpdl.xml
===================================================================
--- jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/load/process.jpdl.xml	                        (rev 0)
+++ jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/load/process.jpdl.xml	2008-12-26 12:28:51 UTC (rev 3550)
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<process name="Process" xmlns="http://jbpm.org/4/jpdl">
+
+  <start>
+    <flow to="a" />
+  </start>
+
+  <java name="a" 
+        class="org.jbpm.test.load.Automatic"
+        method="doNothing">
+    <flow to="b" />
+  </java>
+  
+  <java name="b" 
+        class="org.jbpm.test.load.Automatic"
+        method="doNothing">
+    <flow to="c" />
+  </java>
+  
+  <state name="c">
+    <flow to="d" />
+  </state>
+
+  <java name="d" 
+        class="org.jbpm.test.load.Automatic"
+        method="doNothing">
+    <flow to="e" />
+  </java>
+  
+  <java name="e" 
+        class="org.jbpm.test.load.Automatic"
+        method="doNothing">
+    <flow to="end" />
+  </java>
+  
+  <end name="end" />
+  
+</process>


Property changes on: jbpm4/trunk/modules/test-load/src/test/resources/org/jbpm/test/load/process.jpdl.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF




More information about the jbpm-commits mailing list