[jbpm-commits] JBoss JBPM SVN: r6687 - in jbpm3/branches/jbpm-3.2-soa: enterprise/src/main/java/org/jbpm/ejb/impl and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Sep 28 05:17:12 EDT 2010
Author: alex.guizar at jboss.com
Date: 2010-09-28 05:17:11 -0400 (Tue, 28 Sep 2010)
New Revision: 6687
Added:
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java
Removed:
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java
Modified:
jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml
jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
Log:
JBPM-2945 acquire a jms connection each time one is needed instead of reusing a cached connection;
employ separate queues for timers and other jobs to prevent timers from falling behind
Modified: jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml 2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,10 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- This file defines the default Queues and Topics that jBPM ships with.
- The default Queues and Topics are used by the Command Listener Bean
- and the producer-consumer pair formed by the JMS Message Service and
- the Job Listener Bean.
+ This file defines the default queues that jBPM ships with.
+ The default queues are used by the Command Listener Bean
+ and the producer-consumer pair formed by the JMS Connector Service and
+ the Job/Timer Listener Bean.
You can add other destinations to this file, or you can create other
*-service.xml files to contain your application's destinations.
@@ -23,6 +23,14 @@
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
+ name="jboss.messaging.destination:service=Queue,name=JbpmTimerQueue"
+ xmbean-dd="xmdesc/Queue-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=PostOffice</depends>
+ <attribute name="RedeliveryDelay">1000</attribute>
+ </mbean>
+
+ <mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=JbpmCommandQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -93,9 +93,7 @@
private MessageDrivenContext messageDrivenContext;
private LocalCommandService commandService;
-
private ConnectionFactory jmsConnectionFactory;
- private Connection jmsConnection;
private static final Log log = LogFactory.getLog(CommandListenerBean.class);
@@ -104,18 +102,13 @@
// extract command from message
Command command = extractCommand(message);
if (command == null) return;
+
+ // execute command via local command executor bean
+ Object result;
try {
- // execute command via local command executor bean
- Object result = commandService.execute(command);
- // send a response back if a "reply to" destination is set
- Destination replyTo = message.getJMSReplyTo();
- if (replyTo != null && (result instanceof Serializable || result == null)) {
- sendResult((Serializable) result, replyTo, message.getJMSMessageID());
- }
+ result = commandService.execute(command);
}
catch (RuntimeException e) {
- // MDBs are not supposed to throw exceptions
- messageDrivenContext.setRollbackOnly();
// if this is a locking exception, keep it quiet
if (DbPersistenceService.isLockingException(e)) {
StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to execute "
@@ -124,7 +117,18 @@
else {
log.error("failed to execute " + command, e);
}
+ // MDBs are not supposed to throw exceptions
+ messageDrivenContext.setRollbackOnly();
+ return;
}
+
+ // send the result back if a "reply-to" destination is set
+ Destination replyTo;
+ if (jmsConnectionFactory != null
+ && (replyTo = message.getJMSReplyTo()) != null
+ && (result instanceof Serializable || result == null)) {
+ sendResult((Serializable) result, replyTo, message.getJMSMessageID());
+ }
}
catch (JMSException e) {
messageDrivenContext.setRollbackOnly();
@@ -152,44 +156,28 @@
private void sendResult(Serializable result, Destination destination, String correlationId)
throws JMSException {
if (log.isDebugEnabled()) log.debug("sending " + result + " to " + destination);
- Session jmsSession = createSession();
+ Connection jmsConnection = jmsConnectionFactory.createConnection();
try {
+ /*
+ * if the connection supports xa, the session will be transacted, else the session will
+ * auto acknowledge; in either case no explicit transaction control must be performed -
+ * see ejb 2.1 - 17.3.5
+ */
+ Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Message resultMessage = jmsSession.createObjectMessage(result);
resultMessage.setJMSCorrelationID(correlationId);
jmsSession.createProducer(destination).send(resultMessage);
}
finally {
- jmsSession.close();
+ jmsConnection.close();
}
}
- private Session createSession() throws JMSException {
- if (jmsConnection == null) {
- // create jms connection
- jmsConnection = jmsConnectionFactory.createConnection();
- }
- /*
- * if the connection supports xa, the session will be transacted, else the session will auto
- * acknowledge; in either case no explicit transaction control must be performed - see ejb
- * 2.1 - 17.3.5
- */
- return jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
this.messageDrivenContext = messageDrivenContext;
}
public void ejbRemove() {
- if (jmsConnection != null) {
- try {
- jmsConnection.close();
- }
- catch (JMSException e) {
- log.debug("failed to close jms connection", e);
- }
- jmsConnection = null;
- }
jmsConnectionFactory = null;
commandService = null;
messageDrivenContext = null;
@@ -198,18 +186,23 @@
public void ejbCreate() {
try {
Context jndiContext = new InitialContext();
- jmsConnectionFactory = (ConnectionFactory) jndiContext.lookup("java:comp/env/jms/JbpmConnectionFactory");
-
+ // create command service
LocalCommandServiceHome commandServiceHome = (LocalCommandServiceHome) jndiContext.lookup("java:comp/env/ejb/LocalCommandServiceBean");
commandService = commandServiceHome.create();
-
+ // look for optional connection factory
+ try {
+ jmsConnectionFactory = (ConnectionFactory) jndiContext.lookup("java:comp/env/jms/JbpmConnectionFactory");
+ }
+ catch (NamingException e) {
+ log.warn("error retrieving connection factory, results will not be sent back", e);
+ }
jndiContext.close();
}
catch (NamingException e) {
throw new JbpmException("error retrieving command service home", e);
}
catch (CreateException e) {
- throw new JbpmException("error creating command service", e);
+ throw new JbpmException("failed to create command service", e);
}
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml 2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,22 +1,26 @@
<jbpm-configuration>
- <bean name="jbpm.jms.connector.service" class="org.jbpm.jms.JmsConnectorServiceFactory"
- singleton="true">
- <field name="jbpmConfiguration">
- <ref bean="jbpm.configuration" />
- </field>
- </bean>
-
<jbpm-context>
<service name="persistence" factory="org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory" />
<service name="message">
<factory>
- <ref bean="jbpm.jms.connector.service" />
+ <bean class="org.jbpm.jms.JmsConnectorServiceFactory" singleton="true">
+ <field name="jbpmConfiguration">
+ <ref bean="jbpm.configuration" />
+ </field>
+ </bean>
</factory>
</service>
<service name="scheduler">
<factory>
- <ref bean="jbpm.jms.connector.service" />
+ <bean class="org.jbpm.jms.JmsConnectorServiceFactory" singleton="true">
+ <field name="jbpmConfiguration">
+ <ref bean="jbpm.configuration" />
+ </field>
+ <field name="destinationJndiName">
+ <string value="java:comp/env/jms/TimerQueue" />
+ </field>
+ </bean>
</factory>
</service>
<service name="tx" factory="org.jbpm.tx.TxServiceFactory" />
Copied: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java (from rev 6686, jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java)
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -0,0 +1,182 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.ejb;
+
+import java.io.Serializable;
+
+import javax.annotation.Resource;
+import javax.ejb.ActivationConfigProperty;
+import javax.ejb.EJB;
+import javax.ejb.MessageDriven;
+import javax.ejb.MessageDrivenContext;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.command.Command;
+import org.jbpm.persistence.db.DbPersistenceService;
+import org.jbpm.persistence.db.StaleObjectLogConfigurer;
+
+/**
+ * This message-driven bean listens for {@link ObjectMessage object messages} containing a
+ * command instance. The received commands are executed by the {@link CommandServiceBean command
+ * service} bean, using the local interface.
+ *
+ * The body of the message must be a Java object that implements the {@link Command} interface.
+ * The message properties, if any, are ignored.
+ *
+ * <h3>Environment</h3>
+ *
+ * <p>
+ * The environment entries and resources available for customization are summarized in the table
+ * below.
+ * </p>
+ *
+ * <table border="1">
+ * <tr>
+ * <th>Name</th>
+ * <th>Type</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td><code>ejb/LocalCommandService</code></td>
+ * <td>EJB Reference</td>
+ * <td>Link to the local {@linkplain CommandServiceBean session bean} that executes commands on
+ * a separate jBPM context.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/JbpmConnectionFactory</code></td>
+ * <td>Resource Manager Reference</td>
+ * <td>Logical name of the factory that provides JMS connections for producing result messages.
+ * Required for command messages that indicate a reply destination.</td>
+ * </tr>
+ * </table>
+ *
+ * @author Alejandro Guizar
+ */
+ at MessageDriven(activationConfig = {
+ @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
+})
+public class CommandListenerBean implements MessageListener {
+
+ private static final long serialVersionUID = 1L;
+
+ @Resource
+ private MessageDrivenContext messageDrivenContext;
+ @EJB(name = "ejb/LocalCommandService")
+ private LocalCommandService commandService;
+ @Resource(name = "jms/JbpmConnectionFactory", shareable = true)
+ private ConnectionFactory jmsConnectionFactory;
+
+ private static final Log log = LogFactory.getLog(CommandListenerBean.class);
+
+ public void onMessage(Message message) {
+ try {
+ // extract command from message
+ Command command = extractCommand(message);
+ if (command == null) return;
+
+ // execute command via local command executor bean
+ Object result;
+ try {
+ result = commandService.execute(command);
+ }
+ catch (RuntimeException e) {
+ // if this is a locking exception, keep it quiet
+ if (DbPersistenceService.isLockingException(e)) {
+ StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to execute "
+ + command, e);
+ }
+ else {
+ log.error("failed to execute " + command, e);
+ }
+ // MDBs are not supposed to throw exceptions
+ messageDrivenContext.setRollbackOnly();
+ return;
+ }
+
+ // send a response back if a "reply to" destination is set
+ Destination replyTo;
+ if (jmsConnectionFactory != null
+ && (replyTo = message.getJMSReplyTo()) != null
+ && (result instanceof Serializable || result == null)) {
+ sendResult((Serializable) result, replyTo, message.getJMSMessageID());
+ }
+ }
+ catch (JMSException e) {
+ messageDrivenContext.setRollbackOnly();
+ log.error("failed to process message " + message, e);
+ }
+ }
+
+ /**
+ * Retrieves a {@link Command} instance from the given message, which is assumed to be an
+ * {@link ObjectMessage}.
+ * <p>
+ * Subclasses may override this method to materialize the command in some other way.
+ * </p>
+ */
+ protected Command extractCommand(Message message) throws JMSException {
+ if (message instanceof ObjectMessage) {
+ ObjectMessage objectMessage = (ObjectMessage) message;
+ Serializable object = objectMessage.getObject();
+ if (object instanceof Command) {
+ return (Command) object;
+ }
+ else {
+ log.warn("not a command: " + object);
+ }
+ }
+ else {
+ log.warn("not an object message: " + message);
+ }
+ return null;
+ }
+
+ private void sendResult(Serializable result, Destination destination, String correlationId)
+ throws JMSException {
+ if (log.isDebugEnabled()) log.debug("sending " + result + " to " + destination);
+ Connection jmsConnection = jmsConnectionFactory.createConnection();
+ try {
+ /*
+ * if the connection supports xa, the session will be transacted, else the session will
+ * auto acknowledge; in either case no explicit transaction control must be performed -
+ * see ejb 2.1 - 17.3.5
+ */
+ Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Message resultMessage = jmsSession.createObjectMessage(result);
+ resultMessage.setJMSCorrelationID(correlationId);
+ jmsSession.createProducer(destination).send(resultMessage);
+ }
+ finally {
+ jmsConnection.close();
+ }
+ }
+}
Copied: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java (from rev 6683, jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java)
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.ejb;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import javax.annotation.Resources;
+import javax.ejb.SessionContext;
+import javax.ejb.Stateless;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.sql.DataSource;
+
+import org.jbpm.JbpmConfiguration;
+import org.jbpm.JbpmContext;
+import org.jbpm.JbpmException;
+import org.jbpm.command.Command;
+import org.jbpm.jms.JmsConnectorServiceFactory;
+import org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory;
+
+/**
+ * Stateless session bean that executes {@linkplain Command commands} by calling their
+ * {@link Command#execute(JbpmContext) execute} method on a separate {@link JbpmContext jBPM
+ * context}.
+ *
+ * <h3>Environment</h3>
+ *
+ * <p>
+ * The environment entries and resources available for customization are summarized in the table
+ * below.
+ * </p>
+ *
+ * <table border="1">
+ * <tr>
+ * <th>Name</th>
+ * <th>Type</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td><code>JbpmCfgResource</code></td>
+ * <td>Environment Entry</td>
+ * <td>The classpath resource from which to read the {@linkplain JbpmConfiguration jBPM
+ * configuration}. Optional, defaults to <code>
+ * jbpm.cfg.xml</code>.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jdbc/JbpmDataSource</code></td>
+ * <td>Resource Manager Reference</td>
+ * <td>Logical name of the data source that provides JDBC connections to the
+ * {@linkplain JtaDbPersistenceServiceFactory persistence service}. Must match the
+ * <code>hibernate.connection.datasource</code> property in the Hibernate configuration file.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/JbpmConnectionFactory</code></td>
+ * <td>Resource Manager Reference</td>
+ * <td>Logical name of the factory that provides JMS connections to the
+ * {@linkplain JmsConnectorServiceFactory JMS connector service}. Required for processes that
+ * contain asynchronous continuations.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/JobQueue</code></td>
+ * <td>Message Destination Reference</td>
+ * <td>The message service sends job messages to the queue referenced here. Must be
+ * the same queue from which the {@linkplain JobListenerBean job listener bean} receives
+ * messages.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/TimerQueue</code></td>
+ * <td>Message Destination Reference</td>
+ * <td>The scheduler service sends timer messages to the queue referenced here. Must be the
+ * same queue from which the {@linkplain TimerListenerBean timer listener bean} receives
+ * messages.</td>
+ * </tr>
+ * </table>
+ *
+ * @author Alejandro Guizar
+ */
+ at Stateless
+ at Resources(value = {
+ @Resource(name = "jdbc/JbpmDataSource", type = DataSource.class, shareable = true),
+ @Resource(name = "jms/JbpmConnectionFactory", type = ConnectionFactory.class, shareable = true),
+ @Resource(name = "jms/JobQueue", type = Queue.class),
+ @Resource(name = "jms/TimerQueue", type = Queue.class)
+})
+public class CommandServiceBean implements LocalCommandService {
+
+ private static final long serialVersionUID = 1L;
+
+ @Resource
+ private SessionContext sessionContext;
+ @Resource(name = "JbpmCfgResource")
+ private String jbpmCfgResource;
+
+ private JbpmConfiguration jbpmConfiguration;
+
+ /**
+ * Creates the {@link JbpmConfiguration} to be used by this command service. In case the
+ * environment key <code>JbpmCfgResource</code> is specified, that value is interpreted as the
+ * name of the configuration resource to load from the classpath. If that key is absent, the
+ * default configuration file will be used (jbpm.cfg.xml).
+ */
+ @PostConstruct
+ void createConfiguration() {
+ jbpmConfiguration = JbpmConfiguration.getInstance(jbpmCfgResource);
+ }
+
+ public Object execute(Command command) {
+ JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
+ try {
+ Object result = command.execute(jbpmContext);
+ // check whether command requested a rollback
+ if (jbpmContext.getServices().getTxService().isRollbackOnly()) {
+ sessionContext.setRollbackOnly();
+ }
+ return result;
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new JbpmException("failed to execute " + command, e);
+ }
+ finally {
+ jbpmContext.close();
+ }
+ }
+}
Copied: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java (from rev 6684, jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java)
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -0,0 +1,43 @@
+package org.jbpm.ejb;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.command.Command;
+import org.jbpm.jms.ExecuteJobCommand;
+
+/**
+ * Message-driven bean that listens for {@link Message messages} containing a reference to a
+ * pending {@linkplain org.jbpm.job.Job job} for asynchronous continuations.
+ * <p>
+ * The message must have a <code>long</code> property called <code>jobId</code> which identifies
+ * a job in the database. The message body, if any, is ignored.
+ * </p>
+ * <h3>Environment</h3>
+ * <p>
+ * This bean inherits its environment entries and resources available for customization from
+ * {@link CommandListenerBean}.
+ * </p>
+ *
+ * @author Alejandro Guizar
+ */
+public class JobListenerBean extends CommandListenerBean {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log log = LogFactory.getLog(JobListenerBean.class);
+
+ protected Command extractCommand(Message message) throws JMSException {
+ // checking for jobId property
+ if (message.propertyExists("jobId")) {
+ long jobId = message.getLongProperty("jobId");
+ return new ExecuteJobCommand(jobId);
+ }
+ else {
+ log.warn("property jobId not found");
+ }
+ return null;
+ }
+}
Deleted: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,214 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.ejb.impl;
-
-import java.io.Serializable;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-import javax.ejb.ActivationConfigProperty;
-import javax.ejb.EJB;
-import javax.ejb.MessageDriven;
-import javax.ejb.MessageDrivenContext;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.JbpmException;
-import org.jbpm.command.Command;
-import org.jbpm.ejb.LocalCommandService;
-import org.jbpm.persistence.db.DbPersistenceService;
-import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-
-/**
- * This message-driven bean listens for {@link ObjectMessage object messages} containing a
- * command instance. The received commands are executed by the {@link CommandServiceBean command
- * service} bean, using the local interface.
- *
- * The body of the message must be a Java object that implements the {@link Command} interface.
- * The message properties, if any, are ignored.
- *
- * <h3>Environment</h3>
- *
- * <p>
- * The environment entries and resources available for customization are summarized in the table
- * below.
- * </p>
- *
- * <table border="1">
- * <tr>
- * <th>Name</th>
- * <th>Type</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td><code>ejb/LocalCommandService</code></td>
- * <td>EJB Reference</td>
- * <td>Link to the local {@linkplain CommandServiceBean session bean} that executes commands on
- * a separate jBPM context.</td>
- * </tr>
- * <tr>
- * <td><code>jms/JbpmConnectionFactory</code></td>
- * <td>Resource Manager Reference</td>
- * <td>Logical name of the factory that provides JMS connections for producing result messages.
- * Required for command messages that indicate a reply destination.</td>
- * </tr>
- * </table>
- *
- * @author Alejandro Guizar
- */
- at MessageDriven(activationConfig = {
- @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
-})
-public class CommandListenerBean implements MessageListener {
-
- private static final long serialVersionUID = 1L;
-
- @Resource
- private MessageDrivenContext messageDrivenContext;
- @EJB(name = "ejb/LocalCommandService")
- private LocalCommandService commandService;
- @Resource(name = "jms/JbpmConnectionFactory")
- private ConnectionFactory jmsConnectionFactory;
-
- private Connection jmsConnection;
-
- private static final Log log = LogFactory.getLog(CommandListenerBean.class);
-
- @PostConstruct
- void createConnection() {
- try {
- jmsConnection = jmsConnectionFactory.createConnection();
- }
- catch (JMSException e) {
- throw new JbpmException("failed to create jms connection", e);
- }
- }
-
- @PreDestroy
- void closeConnection() {
- if (jmsConnection != null) {
- try {
- jmsConnection.close();
- }
- catch (JMSException e) {
- log.debug("failed to close jms connection", e);
- }
- }
- }
-
- public void onMessage(Message message) {
- try {
- // extract command from message
- Command command = extractCommand(message);
- if (command == null) return;
-
- // execute command via local command executor bean
- Object result;
- try {
- result = commandService.execute(command);
- }
- catch (RuntimeException e) {
- // MDBs are not supposed to throw exceptions
- messageDrivenContext.setRollbackOnly();
- // if this is a locking exception, keep it quiet
- if (DbPersistenceService.isLockingException(e)) {
- StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to execute "
- + command, e);
- }
- else {
- log.error("failed to execute " + command, e);
- }
- // the exception becomes the result
- result = e;
- }
- // send a response back if a "reply to" destination is set
- Destination replyTo = message.getJMSReplyTo();
- if (replyTo != null && (result instanceof Serializable || result == null)) {
- sendResult((Serializable) result, replyTo, message.getJMSMessageID());
- }
- }
- catch (JMSException e) {
- messageDrivenContext.setRollbackOnly();
- log.error("failed to process message " + message, e);
- }
- }
-
- /**
- * Retrieves a {@link Command} instance from the given message, which is assumed to be an
- * {@link ObjectMessage}.
- * <p>
- * Subclasses may override this method to materialize the command in some other way.
- * </p>
- */
- protected Command extractCommand(Message message) throws JMSException {
- if (message instanceof ObjectMessage) {
- ObjectMessage objectMessage = (ObjectMessage) message;
- Serializable object = objectMessage.getObject();
- if (object instanceof Command) {
- return (Command) object;
- }
- else {
- log.warn("not a command: " + object);
- }
- }
- else {
- log.warn("not an object message: " + message);
- }
- return null;
- }
-
- private void sendResult(Serializable result, Destination destination, String correlationId)
- throws JMSException {
- if (log.isDebugEnabled()) log.debug("sending " + result + " to " + destination);
- Session jmsSession = createSession();
- try {
- Message resultMessage = jmsSession.createObjectMessage(result);
- resultMessage.setJMSCorrelationID(correlationId);
- jmsSession.createProducer(destination).send(resultMessage);
- }
- finally {
- jmsSession.close();
- }
- }
-
- private Session createSession() throws JMSException {
- if (jmsConnection == null) {
- jmsConnection = jmsConnectionFactory.createConnection();
- }
- /*
- * if the connection supports xa, the session will be transacted, else the session will auto
- * acknowledge; in either case no explicit transaction control must be performed - see ejb
- * 2.1 - 17.3.5
- */
- return jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-}
Deleted: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,140 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.ejb.impl;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import javax.annotation.Resources;
-import javax.ejb.SessionContext;
-import javax.ejb.Stateless;
-import javax.jms.ConnectionFactory;
-import javax.jms.Queue;
-import javax.sql.DataSource;
-
-import org.jbpm.JbpmConfiguration;
-import org.jbpm.JbpmContext;
-import org.jbpm.JbpmException;
-import org.jbpm.command.Command;
-import org.jbpm.ejb.LocalCommandService;
-import org.jbpm.jms.JmsConnectorServiceFactory;
-import org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory;
-
-/**
- * Stateless session bean that executes {@linkplain Command commands} by calling their
- * {@link Command#execute(JbpmContext) execute} method on a separate {@link JbpmContext jBPM
- * context}.
- *
- * <h3>Environment</h3>
- *
- * <p>
- * The environment entries and resources available for customization are summarized in the table
- * below.
- * </p>
- *
- * <table border="1">
- * <tr>
- * <th>Name</th>
- * <th>Type</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td><code>JbpmCfgResource</code></td>
- * <td>Environment Entry</td>
- * <td>The classpath resource from which to read the {@linkplain JbpmConfiguration jBPM
- * configuration}. Optional, defaults to <code>
- * jbpm.cfg.xml</code>.</td>
- * </tr>
- * <tr>
- * <td><code>jdbc/JbpmDataSource</code></td>
- * <td>Resource Manager Reference</td>
- * <td>Logical name of the data source that provides JDBC connections to the
- * {@linkplain JtaDbPersistenceServiceFactory persistence service}. Must match the
- * <code>hibernate.connection.datasource</code> property in the Hibernate configuration file.</td>
- * </tr>
- * <tr>
- * <td><code>jms/JbpmConnectionFactory</code></td>
- * <td>Resource Manager Reference</td>
- * <td>Logical name of the factory that provides JMS connections to the
- * {@linkplain JmsConnectorServiceFactory JMS connector service}. Required for processes that
- * contain asynchronous continuations.</td>
- * </tr>
- * <tr>
- * <td><code>jms/JobQueue</code></td>
- * <td>Message Destination Reference</td>
- * <td>The message service sends job messages to the queue referenced here. To ensure this is
- * the same queue from which the {@linkplain JobListenerBean job listener bean} receives
- * messages, the <code>message-destination-link</code> points to a common logical destination,
- * <code>JobQueue</code>.</td>
- * </tr>
- * </table>
- *
- * @author Alejandro Guizar
- */
- at Stateless
- at Resources(value = {
- @Resource(name = "jdbc/JbpmDataSource", type = DataSource.class),
- @Resource(name = "jms/JbpmConnectionFactory", type = ConnectionFactory.class),
- @Resource(name = "jms/JobQueue", type = Queue.class)
-})
-public class CommandServiceBean implements LocalCommandService {
-
- private static final long serialVersionUID = 1L;
-
- @Resource
- private SessionContext sessionContext;
- @Resource(name = "JbpmCfgResource")
- private String jbpmCfgResource;
-
- private JbpmConfiguration jbpmConfiguration;
-
- /**
- * Creates the {@link JbpmConfiguration} to be used by this command service. In case the
- * environment key <code>JbpmCfgResource</code> is specified, that value is interpreted as the
- * name of the configuration resource to load from the classpath. If that key is absent, the
- * default configuration file will be used (jbpm.cfg.xml).
- */
- @PostConstruct
- void createConfiguration() {
- jbpmConfiguration = JbpmConfiguration.getInstance(jbpmCfgResource);
- }
-
- public Object execute(Command command) {
- JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
- try {
- Object result = command.execute(jbpmContext);
- // check whether command requested a rollback
- if (jbpmContext.getServices().getTxService().isRollbackOnly()) {
- sessionContext.setRollbackOnly();
- }
- return result;
- }
- catch (RuntimeException e) {
- throw e;
- }
- catch (Exception e) {
- throw new JbpmException("failed to execute " + command, e);
- }
- finally {
- jbpmContext.close();
- }
- }
-}
Deleted: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java 2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,49 +0,0 @@
-package org.jbpm.ejb.impl;
-
-import javax.ejb.ActivationConfigProperty;
-import javax.ejb.MessageDriven;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.command.Command;
-import org.jbpm.jms.ExecuteJobCommand;
-
-/**
- * Message-driven bean that listens for {@link Message messages} containing a reference to a
- * pending {@linkplain org.jbpm.job.Job job} for asynchronous continuations.
- * <p>
- * The message must have a <code>long</code> property called <code>jobId</code> which identifies
- * a job in the database. The message body, if any, is ignored.
- * </p>
- * <h3>Environment</h3>
- * <p>
- * This bean inherits its environment entries and resources available for customization from
- * {@link CommandListenerBean}.
- * </p>
- *
- * @author Alejandro Guizar
- */
- at MessageDriven(messageListenerInterface = MessageListener.class, activationConfig = {
- @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
-})
-public class JobListenerBean extends CommandListenerBean {
-
- private static final long serialVersionUID = 1L;
- private static final Log log = LogFactory.getLog(JobListenerBean.class);
-
- protected Command extractCommand(Message message) throws JMSException {
- // checking for jobId property
- if (message.propertyExists("jobId")) {
- long jobId = message.getLongProperty("jobId");
- return new ExecuteJobCommand(jobId);
- }
- else {
- log.warn("property jobId not found");
- }
- return null;
- }
-}
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml 2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml 2010-09-28 09:17:11 UTC (rev 6687)
@@ -28,19 +28,28 @@
<message-destination-ref>
<description>
- The message service sends job messages to the queue referenced here. To ensure this is the
- same queue from which the JobListenerBean receives messages, the message-destination-link
- element points to a common logical destination, JobQueue.
+ The message service sends job messages to the queue referenced here.
+ Must be the same queue from which the JobListenerBean receives messages.
</description>
<message-destination-ref-name>jms/JobQueue</message-destination-ref-name>
<message-destination-usage>Produces</message-destination-usage>
<mapped-name>queue/JbpmJobQueue</mapped-name>
</message-destination-ref>
+
+ <message-destination-ref>
+ <description>
+ The scheduler service sends timer messages to the queue referenced here.
+ Must be the same queue from which the TimerListenerBean receives messages.
+ </description>
+ <message-destination-ref-name>jms/TimerQueue</message-destination-ref-name>
+ <message-destination-usage>Produces</message-destination-usage>
+ <mapped-name>queue/JbpmTimerQueue</mapped-name>
+ </message-destination-ref>
</session>
<message-driven>
<description>
- Listens for serialized commands and routes them to the command service session bean.
+ Listens for serialized commands and runs them through the command service.
</description>
<display-name>jBPM Command Listener</display-name>
<ejb-name>CommandListenerBean</ejb-name>
@@ -48,7 +57,7 @@
<resource-ref>
<description>
- Logical name of the factory that provides JMS connections for producing result messages.
+ Logical name of the factory that provides JMS connections for sending result messages.
Required for command messages that indicate a reply destination.
</description>
<res-ref-name>jms/JbpmConnectionFactory</res-ref-name>
@@ -58,21 +67,44 @@
<message-driven>
<description>
- Listens for job references and delegates execution of the referenced Job to the command
- service session bean.
+ Listens for async job references and runs the jobs through the command service.
</description>
- <display-name>jBPM Job Listener</display-name>
- <ejb-name>JobListenerBean</ejb-name>
+ <display-name>jBPM Async Job Listener</display-name>
+ <ejb-name>AsyncJobListenerBean</ejb-name>
<mapped-name>queue/JbpmJobQueue</mapped-name>
+ <ejb-class>org.jbpm.ejb.JobListenerBean</ejb-class>
+ <messaging-type>javax.jms.MessageListener</messaging-type>
+ <message-destination-type>javax.jms.Queue</message-destination-type>
<resource-ref>
<description>
- Logical name of the factory that provides JMS connections for producing result messages.
+ Logical name of the factory that provides JMS connections for sending result messages.
Required for command messages that indicate a reply destination.
</description>
<res-ref-name>jms/JbpmConnectionFactory</res-ref-name>
<mapped-name>java:JmsXA</mapped-name>
</resource-ref>
</message-driven>
+
+ <message-driven>
+ <description>
+ Listens for timer references and runs the timers through the command service.
+ </description>
+ <display-name>jBPM Timer Listener</display-name>
+ <ejb-name>TimerListenerBean</ejb-name>
+ <mapped-name>queue/JbpmTimerQueue</mapped-name>
+ <ejb-class>org.jbpm.ejb.JobListenerBean</ejb-class>
+ <messaging-type>javax.jms.MessageListener</messaging-type>
+ <message-destination-type>javax.jms.Queue</message-destination-type>
+
+ <resource-ref>
+ <description>
+ Logical name of the factory that provides JMS connections for sending result messages.
+ Required for command messages that indicate a reply destination.
+ </description>
+ <res-ref-name>jms/JbpmConnectionFactory</res-ref-name>
+ <mapped-name>java:JmsXA</mapped-name>
+ </resource-ref>
+ </message-driven>
</enterprise-beans>
</ejb-jar>
More information about the jbpm-commits
mailing list