[jbpm-commits] JBoss JBPM SVN: r2965 - in jbpm3/branches/jpdl-3.2.2-SOA-4.2: jpdl/jar/src/main/java/org/jbpm/job/executor and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Nov 17 12:00:21 EST 2008
Author: tom.baeyens at jboss.com
Date: 2008-11-17 12:00:21 -0500 (Mon, 17 Nov 2008)
New Revision: 2965
Modified:
jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java
Log:
juliens fixes for JBPM-1072, JBPM-1186 and JBPM-1179
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java 2008-11-17 16:59:31 UTC (rev 2964)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java 2008-11-17 17:00:21 UTC (rev 2965)
@@ -36,18 +36,28 @@
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.command.Command;
+import org.jbpm.command.DeployProcessCommand;
+import org.jbpm.command.StartProcessInstanceCommand;
import org.jbpm.ejb.LocalCommandService;
import org.jbpm.ejb.LocalCommandServiceHome;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.ProcessInstance;
public class AsyncProcessingTest extends ServletTestCase {
-
+
+ private LocalCommandService commandService;
+
static int nbrOfConcurrentProcessExecutions = 20;
static int maxWaitTime = 30000;
protected static JbpmConfiguration jbpmConfiguration = JbpmConfiguration.getInstance();
+ protected void setUp() throws Exception {
+ InitialContext initialContext = new InitialContext();
+ LocalCommandServiceHome localCommandServiceHome = (LocalCommandServiceHome) initialContext.lookup("java:comp/env/ejb/LocalCommandServiceBean");
+ commandService = localCommandServiceHome.create();
+ }
+
public void testBulkJobs() {
Recorder.resetCollections();
deleteAllJobs();
@@ -59,7 +69,7 @@
assertEquals(expectedResults, Recorder.collectedResults);
}
- public void deleteAllJobs() {
+ protected void deleteAllJobs() {
execute( new Command() {
private static final long serialVersionUID = 1L;
public Object execute(JbpmContext jbpmContext) throws Exception {
@@ -70,86 +80,59 @@
});
}
- public static Object execute(Command command) {
- Object returnValue;
- try {
- InitialContext initialContext = new InitialContext();
- LocalCommandServiceHome localCommandServiceHome = (LocalCommandServiceHome) initialContext.lookup("CommandServiceBean");
- LocalCommandService localCommandService = localCommandServiceHome.create();
- returnValue = localCommandService.execute(command);
- localCommandService.remove();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException("couldn't execute command", e);
- }
- return returnValue;
+ protected Object execute(Command command) {
+ return commandService.execute(command);
}
- public void deployProcess() {
+ protected void deployProcess() {
log.debug("start deploy process");
- execute(new Command() {
- private static final long serialVersionUID = 1L;
- public Object execute(JbpmContext jbpmContext) throws Exception {
- ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(
- "<process-definition name='bulk messages'>" +
- " <start-state>" +
- " <transition to='a' />" +
- " </start-state>" +
- " <node name='a' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='b' />" +
- " </node>" +
- " <node name='b' async='true'>" +
- " <event type='node-enter'>" +
- " <action name='X' async='true' class='"+ActionRecorder.class.getName()+"' />" +
- " </event>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='c' />" +
- " </node>" +
- " <node name='c' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='d'>" +
- " <action name='Y' async='true' class='"+ActionRecorder.class.getName()+"' />" +
- " </transition>" +
- " </node>" +
- " <node name='d' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='e' />" +
- " <event type='node-leave'>" +
- " <action name='Z' async='true' class='"+ActionRecorder.class.getName()+"' />" +
- " </event>" +
- " </node>" +
- " <node name='e' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='end' />" +
- " </node>" +
- " <end-state name='end'/>" +
- "</process-definition>"
- );
-
- log.debug("deploying process");
- jbpmContext.deployProcessDefinition(processDefinition);
-
- return processDefinition;
- }
- });
+ execute(new DeployProcessCommand(
+ "<process-definition name='bulk messages'>" +
+ " <start-state>" +
+ " <transition to='a' />" +
+ " </start-state>" +
+ " <node name='a' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='b' />" +
+ " </node>" +
+ " <node name='b' async='true'>" +
+ " <event type='node-enter'>" +
+ " <action name='X' async='true' class='"+ActionRecorder.class.getName()+"' />" +
+ " </event>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='c' />" +
+ " </node>" +
+ " <node name='c' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='d'>" +
+ " <action name='Y' async='true' class='"+ActionRecorder.class.getName()+"' />" +
+ " </transition>" +
+ " </node>" +
+ " <node name='d' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='e' />" +
+ " <event type='node-leave'>" +
+ " <action name='Z' async='true' class='"+ActionRecorder.class.getName()+"' />" +
+ " </event>" +
+ " </node>" +
+ " <node name='e' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='end' />" +
+ " </node>" +
+ " <end-state name='end'/>" +
+ "</process-definition>"));
}
- public void launchProcesses() {
+ protected void launchProcesses() {
for (int i=0; i<nbrOfConcurrentProcessExecutions; i++) {
- execute(new Command() {
- private static final long serialVersionUID = 1L;
- public Object execute(JbpmContext jbpmContext) throws Exception {
- ProcessInstance processInstance = jbpmContext.newProcessInstanceForUpdate("bulk messages");
- processInstance.signal();
- return processInstance;
- }
- });
+ StartProcessInstanceCommand command = new StartProcessInstanceCommand();
+ command.setProcessName("bulk messages");
+ execute(command);
}
}
- public Set createExpectedResults() {
+ protected Set createExpectedResults() {
Set expectedResults = new TreeSet();
Iterator iter = Recorder.collectedProcessInstanceIds.iterator();
while (iter.hasNext()) {
@@ -195,7 +178,7 @@
}
}
- public static int getNbrOfJobsAvailable() {
+ protected int getNbrOfJobsAvailable() {
Integer nbrOfJobsAvailable = (Integer) execute( new Command() {
private static final long serialVersionUID = 1L;
public Object execute(JbpmContext jbpmContext) throws Exception {
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java 2008-11-17 16:59:31 UTC (rev 2964)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java 2008-11-17 17:00:21 UTC (rev 2965)
@@ -107,4 +107,9 @@
}
return defaultValue;
}
+
+ public void destroy() {
+ super.destroy();
+ jbpmConfiguration.close();
+ }
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2008-11-17 16:59:31 UTC (rev 2964)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2008-11-17 17:00:21 UTC (rev 2965)
@@ -4,6 +4,7 @@
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@@ -11,7 +12,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.Hibernate;
-import org.hibernate.StaleStateException;
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.db.JobSession;
@@ -94,60 +94,69 @@
currentIdleInterval = currentIdleInterval*2;
}
}
- } catch (Throwable t) {
- t.printStackTrace();
+ } catch (Exception e) {
+ // NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
+ e.printStackTrace();
} finally {
log.info(getName()+" leaves cyberspace");
}
}
protected Collection acquireJobs() {
- Collection acquiredJobs = null;
+ Collection acquiredJobs;
synchronized (jobExecutor) {
Collection jobsToLock = new ArrayList();
log.debug("acquiring jobs for execution...");
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
- try {
- JobSession jobSession = jbpmContext.getJobSession();
- log.debug("querying for acquirable job...");
- Job job = jobSession.getFirstAcquirableJob(getName());
- if (job!=null) {
- if (job.isExclusive()) {
- log.debug("exclusive acquirable job found ("+job+"). querying for other exclusive jobs to lock them all in one tx...");
- List otherExclusiveJobs = jobSession.findExclusiveJobs(getName(), job.getProcessInstance());
- jobsToLock.addAll(otherExclusiveJobs);
- log.debug("trying to obtain a process-instance exclusive locks for '"+otherExclusiveJobs+"'");
- } else {
- log.debug("trying to obtain a lock for '"+job+"'");
- jobsToLock.add(job);
- }
-
- Iterator iter = jobsToLock.iterator();
- while (iter.hasNext()) {
- job = (Job) iter.next();
- job.setLockOwner(getName());
- job.setLockTime(new Date());
- // jbpmContext.getSession().update(job);
- }
-
- // HACKY HACK : this is a workaround for a hibernate problem that is fixed in hibernate 3.2.1
- if (job instanceof Timer) {
- Hibernate.initialize(((Timer)job).getGraphElement());
- }
-
+ JobSession jobSession = jbpmContext.getJobSession();
+ log.debug("querying for acquirable job...");
+ Job job = jobSession.getFirstAcquirableJob(getName());
+ if (job!=null) {
+ if (job.isExclusive()) {
+ log.debug("exclusive acquirable job found ("+job+"). querying for other exclusive jobs to lock them all in one tx...");
+ List otherExclusiveJobs = jobSession.findExclusiveJobs(getName(), job.getProcessInstance());
+ jobsToLock.addAll(otherExclusiveJobs);
+ log.debug("trying to obtain a process-instance exclusive locks for '"+otherExclusiveJobs+"'");
} else {
- log.debug("no acquirable jobs in job table");
+ log.debug("trying to obtain a lock for '"+job+"'");
+ jobsToLock.add(job);
}
- } finally {
+ Iterator iter = jobsToLock.iterator();
+ while (iter.hasNext()) {
+ job = (Job) iter.next();
+ job.setLockOwner(getName());
+ job.setLockTime(new Date());
+ // jbpmContext.getSession().update(job);
+ }
+
+ // HACKY HACK : this is a workaround for a hibernate problem that is fixed in hibernate 3.2.1
+ if (job instanceof Timer) {
+ Hibernate.initialize(((Timer)job).getGraphElement());
+ }
+ } else {
+ log.debug("no acquirable jobs in job table");
+ }
+ } finally {
+ try {
jbpmContext.close();
+ acquiredJobs = jobsToLock;
+ log.debug("obtained lock on jobs: "+acquiredJobs);
}
- acquiredJobs = jobsToLock;
- log.debug("obtained locks on following jobs: "+acquiredJobs);
-
- } catch (StaleStateException e) {
- log.debug("couldn't acquire lock on job(s): "+jobsToLock);
+ catch (JbpmPersistenceException e) {
+ // if this is a stale object exception, the jbpm configuration has control over the logging
+ if ("org.hibernate.StaleObjectStateException".equals(e.getCause().getClass().getName())) {
+ log.info("problem committing job acquisition transaction: optimistic locking failed");
+ StaleObjectLogConfigurer.staleObjectExceptionsLog.error("problem committing job acquisition transaction: optimistic locking failed", e);
+ } else {
+ // TODO run() will log this exception, log it here too?
+ log.error("problem committing job acquisition transaction", e);
+ throw e;
+ }
+ acquiredJobs = Collections.EMPTY_LIST;
+ log.debug("couldn't obtain lock on jobs: "+jobsToLock);
+ }
}
}
return acquiredJobs;
@@ -188,12 +197,10 @@
log.info("problem committing job execution transaction: optimistic locking failed");
StaleObjectLogConfigurer.staleObjectExceptionsLog.error("problem committing job execution transaction: optimistic locking failed", e);
} else {
+ // TODO run() will log this exception, log it here too?
log.error("problem committing job execution transaction", e);
+ throw e;
}
- } catch (RuntimeException e) {
- log.error("problem committing job execution transaction", e);
-
- throw e;
}
}
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java 2008-11-17 16:59:31 UTC (rev 2964)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java 2008-11-17 17:00:21 UTC (rev 2965)
@@ -1,6 +1,28 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jbpm.persistence.jta;
import javax.naming.InitialContext;
+import javax.naming.NamingException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
@@ -19,55 +41,45 @@
private static Log log = LogFactory.getLog(JbpmContext.class);
- boolean isJtaTxCreated = false;
+ private UserTransaction userTransaction;
public JtaDbPersistenceService(DbPersistenceServiceFactory persistenceServiceFactory) {
super(persistenceServiceFactory);
- if (! isCurrentJtaTransactionAvailable()) {
+ if (!isJtaTransactionInProgress()) {
beginJtaTransaction();
- isJtaTxCreated = true;
}
}
protected boolean isTransactionActive() {
- return isJtaTxCreated() ;
+ return isJtaTxCreated() ;
}
public void close() {
super.close();
- if (isJtaTxCreated) {
+ if (userTransaction != null) {
endJtaTransaction();
}
}
- boolean isCurrentJtaTransactionAvailable() {
- SessionFactoryImplementor sessionFactoryImplementor = (SessionFactoryImplementor) persistenceServiceFactory.getSessionFactory();
- return JTAHelper.isTransactionInProgress(sessionFactoryImplementor);
+ boolean isJtaTransactionInProgress() {
+ SessionFactoryImplementor sessionFactory = (SessionFactoryImplementor) persistenceServiceFactory.getSessionFactory();
+ return JTAHelper.isTransactionInProgress(sessionFactory);
}
void beginJtaTransaction() {
try {
log.debug("start user JTA transaction");
- getUserTransaction().begin();
+ userTransaction = getUserTransaction();
+ userTransaction.begin();
} catch (Exception e) {
throw new JbpmException("couldn't start JTA transaction", e);
}
}
void endJtaTransaction() {
- int status = -1;
- log.debug("end user JTA transaction");
- UserTransaction userTransaction = getUserTransaction();
- try {
- status = userTransaction.getStatus();
- } catch (SystemException e) {
- throw new JbpmException("couldn't get status for user transaction", e);
- }
-
- boolean isRollback = JTAHelper.isRollback(status);
- if (isRollback || isRollbackOnly()) {
+ if (isRollbackOnly() || JTAHelper.isMarkedForRollback(getJtaTransactionStatus())) {
log.debug("end jta transation with ROLLBACK");
try {
userTransaction.rollback();
@@ -83,25 +95,37 @@
}
}
}
-
+
UserTransaction getUserTransaction() {
- UserTransaction userTransaction = null;
- if (userTransaction == null) {
- String jndiName = "UserTransaction";
- try {
- userTransaction = (UserTransaction) new InitialContext().lookup(jndiName);
- } catch (Exception e) {
- throw new JbpmException("couldn't lookup UserTransaction in JNDI with name "+jndiName, e);
- }
+ String jndiName = persistenceServiceFactory.getConfiguration().getProperty("jta.UserTransaction");
+ if (jndiName == null) {
+ /*
+ * EJB 2.1 §20.9 The container must make the UserTransaction interface available to the
+ * enterprise beans that are allowed to use this interface (only session and message-
+ * driven beans with bean-managed transaction demarcation are allowed to use this
+ * interface) in JNDI under the name java:comp/UserTransaction.
+ * J2EE 1.4 §4.2.1.1 The J2EE platform must provide an object implementing the
+ * UserTransaction interface to all web components. The platform must publish the
+ * UserTransaction object in JNDI under the name java:comp/UserTransaction.
+ */
+ jndiName = "java:comp/UserTransaction";
}
- return userTransaction;
+ try {
+ return (UserTransaction) new InitialContext().lookup(jndiName);
+ } catch (NamingException e) {
+ throw new JbpmException("couldn't lookup UserTransaction in JNDI with name "+jndiName, e);
+ }
}
+ int getJtaTransactionStatus() {
+ try {
+ return userTransaction.getStatus();
+ } catch (SystemException e) {
+ throw new JbpmException("couldn't get status for user transaction", e);
+ }
+ }
public boolean isJtaTxCreated() {
- return isJtaTxCreated;
+ return userTransaction != null;
}
- public void setJtaTxCreated(boolean isJtaTxCreated) {
- this.isJtaTxCreated = isJtaTxCreated;
- }
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java 2008-11-17 16:59:31 UTC (rev 2964)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java 2008-11-17 17:00:21 UTC (rev 2965)
@@ -16,7 +16,4 @@
public Service openService() {
return new JtaDbPersistenceService(this);
}
-
- public void close() {
- }
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java 2008-11-17 16:59:31 UTC (rev 2964)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java 2008-11-17 17:00:21 UTC (rev 2965)
@@ -30,7 +30,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.hibernate.StaleObjectStateException;
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.graph.exe.ProcessInstance;
@@ -214,7 +213,7 @@
public void close() {
if (services!=null) {
Map closeExceptions = new HashMap();
- Throwable firstException = null;
+ Exception firstException = null;
Iterator iter = serviceNames.iterator();
while (iter.hasNext()) {
String serviceName = (String) iter.next();
@@ -231,6 +230,10 @@
} else {
log.error("problem closing service '"+serviceName+"'", e);
}
+ closeExceptions.put(serviceName, e);
+ if (firstException==null) {
+ firstException = e;
+ }
} catch (Exception e) {
// NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
log.error("problem closing service '"+serviceName+"'", e);
@@ -241,8 +244,12 @@
}
}
}
- if (! closeExceptions.isEmpty()) {
- throw new JbpmException("problem closing services "+closeExceptions, firstException);
+ if (!closeExceptions.isEmpty()) {
+ if (firstException instanceof JbpmException) {
+ throw (JbpmException) firstException;
+ } else {
+ throw new JbpmException("problem closing services: "+closeExceptions, firstException);
+ }
}
}
}
More information about the jbpm-commits
mailing list