[jbpm-commits] JBoss JBPM SVN: r2967 - in jbpm3/branches/jpdl-3.2.2-SOA-4.2: jpdl/jar/src/main/java/org/jbpm/job/executor and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Nov 17 18:03:50 EST 2008
Author: alex.guizar at jboss.com
Date: 2008-11-17 18:03:50 -0500 (Mon, 17 Nov 2008)
New Revision: 2967
Modified:
jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java
Log:
reverted r2965 in order to apply issue fixes separately and incorporate test cases
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java 2008-11-17 22:14:39 UTC (rev 2966)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/enterprise/src/test/java/org/jbpm/msg/AsyncProcessingTest.java 2008-11-17 23:03:50 UTC (rev 2967)
@@ -36,28 +36,18 @@
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.command.Command;
-import org.jbpm.command.DeployProcessCommand;
-import org.jbpm.command.StartProcessInstanceCommand;
import org.jbpm.ejb.LocalCommandService;
import org.jbpm.ejb.LocalCommandServiceHome;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.ProcessInstance;
public class AsyncProcessingTest extends ServletTestCase {
-
- private LocalCommandService commandService;
-
+
static int nbrOfConcurrentProcessExecutions = 20;
static int maxWaitTime = 30000;
protected static JbpmConfiguration jbpmConfiguration = JbpmConfiguration.getInstance();
- protected void setUp() throws Exception {
- InitialContext initialContext = new InitialContext();
- LocalCommandServiceHome localCommandServiceHome = (LocalCommandServiceHome) initialContext.lookup("java:comp/env/ejb/LocalCommandServiceBean");
- commandService = localCommandServiceHome.create();
- }
-
public void testBulkJobs() {
Recorder.resetCollections();
deleteAllJobs();
@@ -69,7 +59,7 @@
assertEquals(expectedResults, Recorder.collectedResults);
}
- protected void deleteAllJobs() {
+ public void deleteAllJobs() {
execute( new Command() {
private static final long serialVersionUID = 1L;
public Object execute(JbpmContext jbpmContext) throws Exception {
@@ -80,59 +70,86 @@
});
}
- protected Object execute(Command command) {
- return commandService.execute(command);
+ public static Object execute(Command command) {
+ Object returnValue;
+ try {
+ InitialContext initialContext = new InitialContext();
+ LocalCommandServiceHome localCommandServiceHome = (LocalCommandServiceHome) initialContext.lookup("CommandServiceBean");
+ LocalCommandService localCommandService = localCommandServiceHome.create();
+ returnValue = localCommandService.execute(command);
+ localCommandService.remove();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("couldn't execute command", e);
+ }
+ return returnValue;
}
- protected void deployProcess() {
+ public void deployProcess() {
log.debug("start deploy process");
- execute(new DeployProcessCommand(
- "<process-definition name='bulk messages'>" +
- " <start-state>" +
- " <transition to='a' />" +
- " </start-state>" +
- " <node name='a' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='b' />" +
- " </node>" +
- " <node name='b' async='true'>" +
- " <event type='node-enter'>" +
- " <action name='X' async='true' class='"+ActionRecorder.class.getName()+"' />" +
- " </event>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='c' />" +
- " </node>" +
- " <node name='c' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='d'>" +
- " <action name='Y' async='true' class='"+ActionRecorder.class.getName()+"' />" +
- " </transition>" +
- " </node>" +
- " <node name='d' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='e' />" +
- " <event type='node-leave'>" +
- " <action name='Z' async='true' class='"+ActionRecorder.class.getName()+"' />" +
- " </event>" +
- " </node>" +
- " <node name='e' async='true'>" +
- " <action class='"+Recorder.class.getName()+"' />" +
- " <transition to='end' />" +
- " </node>" +
- " <end-state name='end'/>" +
- "</process-definition>"));
+ execute(new Command() {
+ private static final long serialVersionUID = 1L;
+ public Object execute(JbpmContext jbpmContext) throws Exception {
+ ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(
+ "<process-definition name='bulk messages'>" +
+ " <start-state>" +
+ " <transition to='a' />" +
+ " </start-state>" +
+ " <node name='a' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='b' />" +
+ " </node>" +
+ " <node name='b' async='true'>" +
+ " <event type='node-enter'>" +
+ " <action name='X' async='true' class='"+ActionRecorder.class.getName()+"' />" +
+ " </event>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='c' />" +
+ " </node>" +
+ " <node name='c' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='d'>" +
+ " <action name='Y' async='true' class='"+ActionRecorder.class.getName()+"' />" +
+ " </transition>" +
+ " </node>" +
+ " <node name='d' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='e' />" +
+ " <event type='node-leave'>" +
+ " <action name='Z' async='true' class='"+ActionRecorder.class.getName()+"' />" +
+ " </event>" +
+ " </node>" +
+ " <node name='e' async='true'>" +
+ " <action class='"+Recorder.class.getName()+"' />" +
+ " <transition to='end' />" +
+ " </node>" +
+ " <end-state name='end'/>" +
+ "</process-definition>"
+ );
+
+ log.debug("deploying process");
+ jbpmContext.deployProcessDefinition(processDefinition);
+
+ return processDefinition;
+ }
+ });
}
- protected void launchProcesses() {
+ public void launchProcesses() {
for (int i=0; i<nbrOfConcurrentProcessExecutions; i++) {
- StartProcessInstanceCommand command = new StartProcessInstanceCommand();
- command.setProcessName("bulk messages");
- execute(command);
+ execute(new Command() {
+ private static final long serialVersionUID = 1L;
+ public Object execute(JbpmContext jbpmContext) throws Exception {
+ ProcessInstance processInstance = jbpmContext.newProcessInstanceForUpdate("bulk messages");
+ processInstance.signal();
+ return processInstance;
+ }
+ });
}
}
- protected Set createExpectedResults() {
+ public Set createExpectedResults() {
Set expectedResults = new TreeSet();
Iterator iter = Recorder.collectedProcessInstanceIds.iterator();
while (iter.hasNext()) {
@@ -178,7 +195,7 @@
}
}
- protected int getNbrOfJobsAvailable() {
+ public static int getNbrOfJobsAvailable() {
Integer nbrOfJobsAvailable = (Integer) execute( new Command() {
private static final long serialVersionUID = 1L;
public Object execute(JbpmContext jbpmContext) throws Exception {
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java 2008-11-17 22:14:39 UTC (rev 2966)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorServlet.java 2008-11-17 23:03:50 UTC (rev 2967)
@@ -107,9 +107,4 @@
}
return defaultValue;
}
-
- public void destroy() {
- super.destroy();
- jbpmConfiguration.close();
- }
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2008-11-17 22:14:39 UTC (rev 2966)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/job/executor/JobExecutorThread.java 2008-11-17 23:03:50 UTC (rev 2967)
@@ -4,7 +4,6 @@
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@@ -12,6 +11,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.Hibernate;
+import org.hibernate.StaleStateException;
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.db.JobSession;
@@ -94,69 +94,60 @@
currentIdleInterval = currentIdleInterval*2;
}
}
- } catch (Exception e) {
- // NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
- e.printStackTrace();
+ } catch (Throwable t) {
+ t.printStackTrace();
} finally {
log.info(getName()+" leaves cyberspace");
}
}
protected Collection acquireJobs() {
- Collection acquiredJobs;
+ Collection acquiredJobs = null;
synchronized (jobExecutor) {
Collection jobsToLock = new ArrayList();
log.debug("acquiring jobs for execution...");
JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
try {
- JobSession jobSession = jbpmContext.getJobSession();
- log.debug("querying for acquirable job...");
- Job job = jobSession.getFirstAcquirableJob(getName());
- if (job!=null) {
- if (job.isExclusive()) {
- log.debug("exclusive acquirable job found ("+job+"). querying for other exclusive jobs to lock them all in one tx...");
- List otherExclusiveJobs = jobSession.findExclusiveJobs(getName(), job.getProcessInstance());
- jobsToLock.addAll(otherExclusiveJobs);
- log.debug("trying to obtain a process-instance exclusive locks for '"+otherExclusiveJobs+"'");
+ try {
+ JobSession jobSession = jbpmContext.getJobSession();
+ log.debug("querying for acquirable job...");
+ Job job = jobSession.getFirstAcquirableJob(getName());
+ if (job!=null) {
+ if (job.isExclusive()) {
+ log.debug("exclusive acquirable job found ("+job+"). querying for other exclusive jobs to lock them all in one tx...");
+ List otherExclusiveJobs = jobSession.findExclusiveJobs(getName(), job.getProcessInstance());
+ jobsToLock.addAll(otherExclusiveJobs);
+ log.debug("trying to obtain a process-instance exclusive locks for '"+otherExclusiveJobs+"'");
+ } else {
+ log.debug("trying to obtain a lock for '"+job+"'");
+ jobsToLock.add(job);
+ }
+
+ Iterator iter = jobsToLock.iterator();
+ while (iter.hasNext()) {
+ job = (Job) iter.next();
+ job.setLockOwner(getName());
+ job.setLockTime(new Date());
+ // jbpmContext.getSession().update(job);
+ }
+
+ // HACKY HACK : this is a workaround for a hibernate problem that is fixed in hibernate 3.2.1
+ if (job instanceof Timer) {
+ Hibernate.initialize(((Timer)job).getGraphElement());
+ }
+
} else {
- log.debug("trying to obtain a lock for '"+job+"'");
- jobsToLock.add(job);
+ log.debug("no acquirable jobs in job table");
}
- Iterator iter = jobsToLock.iterator();
- while (iter.hasNext()) {
- job = (Job) iter.next();
- job.setLockOwner(getName());
- job.setLockTime(new Date());
- // jbpmContext.getSession().update(job);
- }
-
- // HACKY HACK : this is a workaround for a hibernate problem that is fixed in hibernate 3.2.1
- if (job instanceof Timer) {
- Hibernate.initialize(((Timer)job).getGraphElement());
- }
- } else {
- log.debug("no acquirable jobs in job table");
- }
- } finally {
- try {
+ } finally {
jbpmContext.close();
- acquiredJobs = jobsToLock;
- log.debug("obtained lock on jobs: "+acquiredJobs);
}
- catch (JbpmPersistenceException e) {
- // if this is a stale object exception, the jbpm configuration has control over the logging
- if ("org.hibernate.StaleObjectStateException".equals(e.getCause().getClass().getName())) {
- log.info("problem committing job acquisition transaction: optimistic locking failed");
- StaleObjectLogConfigurer.staleObjectExceptionsLog.error("problem committing job acquisition transaction: optimistic locking failed", e);
- } else {
- // TODO run() will log this exception, log it here too?
- log.error("problem committing job acquisition transaction", e);
- throw e;
- }
- acquiredJobs = Collections.EMPTY_LIST;
- log.debug("couldn't obtain lock on jobs: "+jobsToLock);
- }
+ acquiredJobs = jobsToLock;
+ log.debug("obtained locks on following jobs: "+acquiredJobs);
+
+ } catch (StaleStateException e) {
+ log.debug("couldn't acquire lock on job(s): "+jobsToLock);
}
}
return acquiredJobs;
@@ -197,10 +188,12 @@
log.info("problem committing job execution transaction: optimistic locking failed");
StaleObjectLogConfigurer.staleObjectExceptionsLog.error("problem committing job execution transaction: optimistic locking failed", e);
} else {
- // TODO run() will log this exception, log it here too?
log.error("problem committing job execution transaction", e);
- throw e;
}
+ } catch (RuntimeException e) {
+ log.error("problem committing job execution transaction", e);
+
+ throw e;
}
}
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java 2008-11-17 22:14:39 UTC (rev 2966)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceService.java 2008-11-17 23:03:50 UTC (rev 2967)
@@ -1,28 +1,6 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
package org.jbpm.persistence.jta;
import javax.naming.InitialContext;
-import javax.naming.NamingException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
@@ -41,45 +19,55 @@
private static Log log = LogFactory.getLog(JbpmContext.class);
- private UserTransaction userTransaction;
+ boolean isJtaTxCreated = false;
public JtaDbPersistenceService(DbPersistenceServiceFactory persistenceServiceFactory) {
super(persistenceServiceFactory);
- if (!isJtaTransactionInProgress()) {
+ if (! isCurrentJtaTransactionAvailable()) {
beginJtaTransaction();
+ isJtaTxCreated = true;
}
}
protected boolean isTransactionActive() {
- return isJtaTxCreated() ;
+ return isJtaTxCreated() ;
}
public void close() {
super.close();
- if (userTransaction != null) {
+ if (isJtaTxCreated) {
endJtaTransaction();
}
}
- boolean isJtaTransactionInProgress() {
- SessionFactoryImplementor sessionFactory = (SessionFactoryImplementor) persistenceServiceFactory.getSessionFactory();
- return JTAHelper.isTransactionInProgress(sessionFactory);
+ boolean isCurrentJtaTransactionAvailable() {
+ SessionFactoryImplementor sessionFactoryImplementor = (SessionFactoryImplementor) persistenceServiceFactory.getSessionFactory();
+ return JTAHelper.isTransactionInProgress(sessionFactoryImplementor);
}
void beginJtaTransaction() {
try {
log.debug("start user JTA transaction");
- userTransaction = getUserTransaction();
- userTransaction.begin();
+ getUserTransaction().begin();
} catch (Exception e) {
throw new JbpmException("couldn't start JTA transaction", e);
}
}
void endJtaTransaction() {
- if (isRollbackOnly() || JTAHelper.isMarkedForRollback(getJtaTransactionStatus())) {
+ int status = -1;
+ log.debug("end user JTA transaction");
+ UserTransaction userTransaction = getUserTransaction();
+ try {
+ status = userTransaction.getStatus();
+ } catch (SystemException e) {
+ throw new JbpmException("couldn't get status for user transaction", e);
+ }
+
+ boolean isRollback = JTAHelper.isRollback(status);
+ if (isRollback || isRollbackOnly()) {
log.debug("end jta transation with ROLLBACK");
try {
userTransaction.rollback();
@@ -95,37 +83,25 @@
}
}
}
-
+
UserTransaction getUserTransaction() {
- String jndiName = persistenceServiceFactory.getConfiguration().getProperty("jta.UserTransaction");
- if (jndiName == null) {
- /*
- * EJB 2.1 §20.9 The container must make the UserTransaction interface available to the
- * enterprise beans that are allowed to use this interface (only session and message-
- * driven beans with bean-managed transaction demarcation are allowed to use this
- * interface) in JNDI under the name java:comp/UserTransaction.
- * J2EE 1.4 §4.2.1.1 The J2EE platform must provide an object implementing the
- * UserTransaction interface to all web components. The platform must publish the
- * UserTransaction object in JNDI under the name java:comp/UserTransaction.
- */
- jndiName = "java:comp/UserTransaction";
+ UserTransaction userTransaction = null;
+ if (userTransaction == null) {
+ String jndiName = "UserTransaction";
+ try {
+ userTransaction = (UserTransaction) new InitialContext().lookup(jndiName);
+ } catch (Exception e) {
+ throw new JbpmException("couldn't lookup UserTransaction in JNDI with name "+jndiName, e);
+ }
}
- try {
- return (UserTransaction) new InitialContext().lookup(jndiName);
- } catch (NamingException e) {
- throw new JbpmException("couldn't lookup UserTransaction in JNDI with name "+jndiName, e);
- }
+ return userTransaction;
}
- int getJtaTransactionStatus() {
- try {
- return userTransaction.getStatus();
- } catch (SystemException e) {
- throw new JbpmException("couldn't get status for user transaction", e);
- }
- }
public boolean isJtaTxCreated() {
- return userTransaction != null;
+ return isJtaTxCreated;
}
+ public void setJtaTxCreated(boolean isJtaTxCreated) {
+ this.isJtaTxCreated = isJtaTxCreated;
+ }
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java 2008-11-17 22:14:39 UTC (rev 2966)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java 2008-11-17 23:03:50 UTC (rev 2967)
@@ -16,4 +16,7 @@
public Service openService() {
return new JtaDbPersistenceService(this);
}
+
+ public void close() {
+ }
}
Modified: jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java
===================================================================
--- jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java 2008-11-17 22:14:39 UTC (rev 2966)
+++ jbpm3/branches/jpdl-3.2.2-SOA-4.2/jpdl/jar/src/main/java/org/jbpm/svc/Services.java 2008-11-17 23:03:50 UTC (rev 2967)
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.hibernate.StaleObjectStateException;
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.graph.exe.ProcessInstance;
@@ -213,7 +214,7 @@
public void close() {
if (services!=null) {
Map closeExceptions = new HashMap();
- Exception firstException = null;
+ Throwable firstException = null;
Iterator iter = serviceNames.iterator();
while (iter.hasNext()) {
String serviceName = (String) iter.next();
@@ -230,10 +231,6 @@
} else {
log.error("problem closing service '"+serviceName+"'", e);
}
- closeExceptions.put(serviceName, e);
- if (firstException==null) {
- firstException = e;
- }
} catch (Exception e) {
// NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
log.error("problem closing service '"+serviceName+"'", e);
@@ -244,12 +241,8 @@
}
}
}
- if (!closeExceptions.isEmpty()) {
- if (firstException instanceof JbpmException) {
- throw (JbpmException) firstException;
- } else {
- throw new JbpmException("problem closing services: "+closeExceptions, firstException);
- }
+ if (! closeExceptions.isEmpty()) {
+ throw new JbpmException("problem closing services "+closeExceptions, firstException);
}
}
}
More information about the jbpm-commits
mailing list