[jbpm-commits] JBoss JBPM SVN: r6687 - in jbpm3/branches/jbpm-3.2-soa: enterprise/src/main/java/org/jbpm/ejb/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 28 05:17:12 EDT 2010


Author: alex.guizar at jboss.com
Date: 2010-09-28 05:17:11 -0400 (Tue, 28 Sep 2010)
New Revision: 6687

Added:
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java
Removed:
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java
Modified:
   jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml
   jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
Log:
JBPM-2945 acquire a jms connection each time one is needed instead of reusing a cached connection;
employ separate queues for timers and other jobs to prevent timers from falling behind

Modified: jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/distribution/src/main/resources/destination/jbpm-jbm-service.xml	2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,10 +1,10 @@
 <?xml version="1.0" encoding="UTF-8"?>
 
 <!-- 
-  This file defines the default Queues and Topics that jBPM ships with.
-  The default Queues and Topics are used by the Command Listener Bean
-  and the producer-consumer pair formed by the JMS Message Service and
-  the Job Listener Bean.
+  This file defines the default queues that jBPM ships with.
+  The default queues are used by the Command Listener Bean
+  and the producer-consumer pair formed by the JMS Connector Service and
+  the Job/Timer Listener Bean.
 
   You can add other destinations to this file, or you can create other
   *-service.xml files to contain your application's destinations.
@@ -23,6 +23,14 @@
   </mbean>
 
   <mbean code="org.jboss.jms.server.destination.QueueService"
+    name="jboss.messaging.destination:service=Queue,name=JbpmTimerQueue"
+    xmbean-dd="xmdesc/Queue-xmbean.xml">
+    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+    <depends>jboss.messaging:service=PostOffice</depends>
+    <attribute name="RedeliveryDelay">1000</attribute>
+  </mbean>
+
+  <mbean code="org.jboss.jms.server.destination.QueueService"
     name="jboss.messaging.destination:service=Queue,name=JbpmCommandQueue"
     xmbean-dd="xmdesc/Queue-xmbean.xml">
     <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -93,9 +93,7 @@
 
   private MessageDrivenContext messageDrivenContext;
   private LocalCommandService commandService;
-
   private ConnectionFactory jmsConnectionFactory;
-  private Connection jmsConnection;
 
   private static final Log log = LogFactory.getLog(CommandListenerBean.class);
 
@@ -104,18 +102,13 @@
       // extract command from message
       Command command = extractCommand(message);
       if (command == null) return;
+
+      // execute command via local command executor bean
+      Object result;
       try {
-        // execute command via local command executor bean
-        Object result = commandService.execute(command);
-        // send a response back if a "reply to" destination is set
-        Destination replyTo = message.getJMSReplyTo();
-        if (replyTo != null && (result instanceof Serializable || result == null)) {
-          sendResult((Serializable) result, replyTo, message.getJMSMessageID());
-        }
+        result = commandService.execute(command);
       }
       catch (RuntimeException e) {
-        // MDBs are not supposed to throw exceptions
-        messageDrivenContext.setRollbackOnly();
         // if this is a locking exception, keep it quiet
         if (DbPersistenceService.isLockingException(e)) {
           StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to execute "
@@ -124,7 +117,18 @@
         else {
           log.error("failed to execute " + command, e);
         }
+        // MDBs are not supposed to throw exceptions
+        messageDrivenContext.setRollbackOnly();
+        return;
       }
+
+      // send the result back if a "reply-to" destination is set
+      Destination replyTo;
+      if (jmsConnectionFactory != null
+        && (replyTo = message.getJMSReplyTo()) != null
+        && (result instanceof Serializable || result == null)) {
+        sendResult((Serializable) result, replyTo, message.getJMSMessageID());
+      }
     }
     catch (JMSException e) {
       messageDrivenContext.setRollbackOnly();
@@ -152,44 +156,28 @@
   private void sendResult(Serializable result, Destination destination, String correlationId)
     throws JMSException {
     if (log.isDebugEnabled()) log.debug("sending " + result + " to " + destination);
-    Session jmsSession = createSession();
+    Connection jmsConnection = jmsConnectionFactory.createConnection();
     try {
+      /*
+       * if the connection supports xa, the session will be transacted, else the session will
+       * auto acknowledge; in either case no explicit transaction control must be performed -
+       * see ejb 2.1 - 17.3.5
+       */
+      Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Message resultMessage = jmsSession.createObjectMessage(result);
       resultMessage.setJMSCorrelationID(correlationId);
       jmsSession.createProducer(destination).send(resultMessage);
     }
     finally {
-      jmsSession.close();
+      jmsConnection.close();
     }
   }
 
-  private Session createSession() throws JMSException {
-    if (jmsConnection == null) {
-      // create jms connection
-      jmsConnection = jmsConnectionFactory.createConnection();
-    }
-    /*
-     * if the connection supports xa, the session will be transacted, else the session will auto
-     * acknowledge; in either case no explicit transaction control must be performed - see ejb
-     * 2.1 - 17.3.5
-     */
-    return jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-  }
-
   public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
     this.messageDrivenContext = messageDrivenContext;
   }
 
   public void ejbRemove() {
-    if (jmsConnection != null) {
-      try {
-        jmsConnection.close();
-      }
-      catch (JMSException e) {
-        log.debug("failed to close jms connection", e);
-      }
-      jmsConnection = null;
-    }
     jmsConnectionFactory = null;
     commandService = null;
     messageDrivenContext = null;
@@ -198,18 +186,23 @@
   public void ejbCreate() {
     try {
       Context jndiContext = new InitialContext();
-      jmsConnectionFactory = (ConnectionFactory) jndiContext.lookup("java:comp/env/jms/JbpmConnectionFactory");
-
+      // create command service
       LocalCommandServiceHome commandServiceHome = (LocalCommandServiceHome) jndiContext.lookup("java:comp/env/ejb/LocalCommandServiceBean");
       commandService = commandServiceHome.create();
-
+      // look for optional connection factory
+      try {
+        jmsConnectionFactory = (ConnectionFactory) jndiContext.lookup("java:comp/env/jms/JbpmConnectionFactory");
+      }
+      catch (NamingException e) {
+        log.warn("error retrieving connection factory, results will not be sent back", e);
+      }
       jndiContext.close();
     }
     catch (NamingException e) {
       throw new JbpmException("error retrieving command service home", e);
     }
     catch (CreateException e) {
-      throw new JbpmException("error creating command service", e);
+      throw new JbpmException("failed to create command service", e);
     }
   }
 }

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml	2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,22 +1,26 @@
 <jbpm-configuration>
 
-  <bean name="jbpm.jms.connector.service" class="org.jbpm.jms.JmsConnectorServiceFactory"
-    singleton="true">
-    <field name="jbpmConfiguration">
-      <ref bean="jbpm.configuration" />
-    </field>
-  </bean>
-
   <jbpm-context>
     <service name="persistence" factory="org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory" />
     <service name="message">
       <factory>
-        <ref bean="jbpm.jms.connector.service" />
+        <bean class="org.jbpm.jms.JmsConnectorServiceFactory" singleton="true">
+          <field name="jbpmConfiguration">
+            <ref bean="jbpm.configuration" />
+          </field>
+        </bean>
       </factory>
     </service>
     <service name="scheduler">
       <factory>
-        <ref bean="jbpm.jms.connector.service" />
+        <bean class="org.jbpm.jms.JmsConnectorServiceFactory" singleton="true">
+          <field name="jbpmConfiguration">
+            <ref bean="jbpm.configuration" />
+          </field>
+          <field name="destinationJndiName">
+            <string value="java:comp/env/jms/TimerQueue" />
+          </field>
+        </bean>
       </factory>
     </service>
     <service name="tx" factory="org.jbpm.tx.TxServiceFactory" />

Copied: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java (from rev 6686, jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java)
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandListenerBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -0,0 +1,182 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.ejb;
+
+import java.io.Serializable;
+
+import javax.annotation.Resource;
+import javax.ejb.ActivationConfigProperty;
+import javax.ejb.EJB;
+import javax.ejb.MessageDriven;
+import javax.ejb.MessageDrivenContext;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.command.Command;
+import org.jbpm.persistence.db.DbPersistenceService;
+import org.jbpm.persistence.db.StaleObjectLogConfigurer;
+
+/**
+ * This message-driven bean listens for {@link ObjectMessage object messages} containing a
+ * command instance. The received commands are executed by the {@link CommandServiceBean command
+ * service} bean, using the local interface.
+ * 
+ * The body of the message must be a Java object that implements the {@link Command} interface.
+ * The message properties, if any, are ignored.
+ * 
+ * <h3>Environment</h3>
+ * 
+ * <p>
+ * The environment entries and resources available for customization are summarized in the table
+ * below.
+ * </p>
+ * 
+ * <table border="1">
+ * <tr>
+ * <th>Name</th>
+ * <th>Type</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td><code>ejb/LocalCommandService</code></td>
+ * <td>EJB Reference</td>
+ * <td>Link to the local {@linkplain CommandServiceBean session bean} that executes commands on
+ * a separate jBPM context.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/JbpmConnectionFactory</code></td>
+ * <td>Resource Manager Reference</td>
+ * <td>Logical name of the factory that provides JMS connections for producing result messages.
+ * Required for command messages that indicate a reply destination.</td>
+ * </tr>
+ * </table>
+ * 
+ * @author Alejandro Guizar
+ */
+ at MessageDriven(activationConfig = {
+  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
+})
+public class CommandListenerBean implements MessageListener {
+
+  private static final long serialVersionUID = 1L;
+
+  @Resource
+  private MessageDrivenContext messageDrivenContext;
+  @EJB(name = "ejb/LocalCommandService")
+  private LocalCommandService commandService;
+  @Resource(name = "jms/JbpmConnectionFactory", shareable = true)
+  private ConnectionFactory jmsConnectionFactory;
+
+  private static final Log log = LogFactory.getLog(CommandListenerBean.class);
+
+  public void onMessage(Message message) {
+    try {
+      // extract command from message
+      Command command = extractCommand(message);
+      if (command == null) return;
+
+      // execute command via local command executor bean
+      Object result;
+      try {
+        result = commandService.execute(command);
+      }
+      catch (RuntimeException e) {
+        // if this is a locking exception, keep it quiet
+        if (DbPersistenceService.isLockingException(e)) {
+          StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to execute "
+            + command, e);
+        }
+        else {
+          log.error("failed to execute " + command, e);
+        }
+        // MDBs are not supposed to throw exceptions
+        messageDrivenContext.setRollbackOnly();
+        return;
+      }
+
+      // send a response back if a "reply to" destination is set
+      Destination replyTo;
+      if (jmsConnectionFactory != null
+        && (replyTo = message.getJMSReplyTo()) != null
+        && (result instanceof Serializable || result == null)) {
+        sendResult((Serializable) result, replyTo, message.getJMSMessageID());
+      }
+    }
+    catch (JMSException e) {
+      messageDrivenContext.setRollbackOnly();
+      log.error("failed to process message " + message, e);
+    }
+  }
+
+  /**
+   * Retrieves a {@link Command} instance from the given message, which is assumed to be an
+   * {@link ObjectMessage}.
+   * <p>
+   * Subclasses may override this method to materialize the command in some other way.
+   * </p>
+   */
+  protected Command extractCommand(Message message) throws JMSException {
+    if (message instanceof ObjectMessage) {
+      ObjectMessage objectMessage = (ObjectMessage) message;
+      Serializable object = objectMessage.getObject();
+      if (object instanceof Command) {
+        return (Command) object;
+      }
+      else {
+        log.warn("not a command: " + object);
+      }
+    }
+    else {
+      log.warn("not an object message: " + message);
+    }
+    return null;
+  }
+
+  private void sendResult(Serializable result, Destination destination, String correlationId)
+    throws JMSException {
+    if (log.isDebugEnabled()) log.debug("sending " + result + " to " + destination);
+    Connection jmsConnection = jmsConnectionFactory.createConnection();
+    try {
+      /*
+       * if the connection supports xa, the session will be transacted, else the session will
+       * auto acknowledge; in either case no explicit transaction control must be performed -
+       * see ejb 2.1 - 17.3.5
+       */
+      Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Message resultMessage = jmsSession.createObjectMessage(result);
+      resultMessage.setJMSCorrelationID(correlationId);
+      jmsSession.createProducer(destination).send(resultMessage);
+    }
+    finally {
+      jmsConnection.close();
+    }
+  }
+}

Copied: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java (from rev 6683, jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java)
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/CommandServiceBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jbpm.ejb;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import javax.annotation.Resources;
+import javax.ejb.SessionContext;
+import javax.ejb.Stateless;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.sql.DataSource;
+
+import org.jbpm.JbpmConfiguration;
+import org.jbpm.JbpmContext;
+import org.jbpm.JbpmException;
+import org.jbpm.command.Command;
+import org.jbpm.jms.JmsConnectorServiceFactory;
+import org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory;
+
+/**
+ * Stateless session bean that executes {@linkplain Command commands} by calling their
+ * {@link Command#execute(JbpmContext) execute} method on a separate {@link JbpmContext jBPM
+ * context}.
+ * 
+ * <h3>Environment</h3>
+ * 
+ * <p>
+ * The environment entries and resources available for customization are summarized in the table
+ * below.
+ * </p>
+ * 
+ * <table border="1">
+ * <tr>
+ * <th>Name</th>
+ * <th>Type</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td><code>JbpmCfgResource</code></td>
+ * <td>Environment Entry</td>
+ * <td>The classpath resource from which to read the {@linkplain JbpmConfiguration jBPM
+ * configuration}. Optional, defaults to <code>
+ * jbpm.cfg.xml</code>.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jdbc/JbpmDataSource</code></td>
+ * <td>Resource Manager Reference</td>
+ * <td>Logical name of the data source that provides JDBC connections to the
+ * {@linkplain JtaDbPersistenceServiceFactory persistence service}. Must match the
+ * <code>hibernate.connection.datasource</code> property in the Hibernate configuration file.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/JbpmConnectionFactory</code></td>
+ * <td>Resource Manager Reference</td>
+ * <td>Logical name of the factory that provides JMS connections to the
+ * {@linkplain JmsConnectorServiceFactory JMS connector service}. Required for processes that
+ * contain asynchronous continuations.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/JobQueue</code></td>
+ * <td>Message Destination Reference</td>
+ * <td>The message service sends job messages to the queue referenced here. Must be
+ * the same queue from which the {@linkplain JobListenerBean job listener bean} receives
+ * messages.</td>
+ * </tr>
+ * <tr>
+ * <td><code>jms/TimerQueue</code></td>
+ * <td>Message Destination Reference</td>
+ * <td>The scheduler service sends timer messages to the queue referenced here. Must be the
+ * same queue from which the {@linkplain TimerListenerBean timer listener bean} receives
+ * messages.</td>
+ * </tr>
+ * </table>
+ * 
+ * @author Alejandro Guizar
+ */
+ at Stateless
+ at Resources(value = {
+  @Resource(name = "jdbc/JbpmDataSource", type = DataSource.class, shareable = true),
+  @Resource(name = "jms/JbpmConnectionFactory", type = ConnectionFactory.class, shareable = true),
+  @Resource(name = "jms/JobQueue", type = Queue.class),
+  @Resource(name = "jms/TimerQueue", type = Queue.class)
+})
+public class CommandServiceBean implements LocalCommandService {
+
+  private static final long serialVersionUID = 1L;
+
+  @Resource
+  private SessionContext sessionContext;
+  @Resource(name = "JbpmCfgResource")
+  private String jbpmCfgResource;
+
+  private JbpmConfiguration jbpmConfiguration;
+
+  /**
+   * Creates the {@link JbpmConfiguration} to be used by this command service. In case the
+   * environment key <code>JbpmCfgResource</code> is specified, that value is interpreted as the
+   * name of the configuration resource to load from the classpath. If that key is absent, the
+   * default configuration file will be used (jbpm.cfg.xml).
+   */
+  @PostConstruct
+  void createConfiguration() {
+    jbpmConfiguration = JbpmConfiguration.getInstance(jbpmCfgResource);
+  }
+
+  public Object execute(Command command) {
+    JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
+    try {
+      Object result = command.execute(jbpmContext);
+      // check whether command requested a rollback
+      if (jbpmContext.getServices().getTxService().isRollbackOnly()) {
+        sessionContext.setRollbackOnly();
+      }
+      return result;
+    }
+    catch (RuntimeException e) {
+      throw e;
+    }
+    catch (Exception e) {
+      throw new JbpmException("failed to execute " + command, e);
+    }
+    finally {
+      jbpmContext.close();
+    }
+  }
+}

Copied: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java (from rev 6684, jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java)
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java	                        (rev 0)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/JobListenerBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -0,0 +1,43 @@
+package org.jbpm.ejb;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.jbpm.command.Command;
+import org.jbpm.jms.ExecuteJobCommand;
+
+/**
+ * Message-driven bean that listens for {@link Message messages} containing a reference to a
+ * pending {@linkplain org.jbpm.job.Job job} for asynchronous continuations.
+ * <p>
+ * The message must have a <code>long</code> property called <code>jobId</code> which identifies
+ * a job in the database. The message body, if any, is ignored.
+ * </p>
+ * <h3>Environment</h3>
+ * <p>
+ * This bean inherits its environment entries and resources available for customization from
+ * {@link CommandListenerBean}.
+ * </p>
+ * 
+ * @author Alejandro Guizar
+ */
+public class JobListenerBean extends CommandListenerBean {
+
+  private static final long serialVersionUID = 1L;
+  private static final Log log = LogFactory.getLog(JobListenerBean.class);
+
+  protected Command extractCommand(Message message) throws JMSException {
+    // checking for jobId property
+    if (message.propertyExists("jobId")) {
+      long jobId = message.getLongProperty("jobId");
+      return new ExecuteJobCommand(jobId);
+    }
+    else {
+      log.warn("property jobId not found");
+    }
+    return null;
+  }
+}

Deleted: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,214 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.ejb.impl;
-
-import java.io.Serializable;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-import javax.ejb.ActivationConfigProperty;
-import javax.ejb.EJB;
-import javax.ejb.MessageDriven;
-import javax.ejb.MessageDrivenContext;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.JbpmException;
-import org.jbpm.command.Command;
-import org.jbpm.ejb.LocalCommandService;
-import org.jbpm.persistence.db.DbPersistenceService;
-import org.jbpm.persistence.db.StaleObjectLogConfigurer;
-
-/**
- * This message-driven bean listens for {@link ObjectMessage object messages} containing a
- * command instance. The received commands are executed by the {@link CommandServiceBean command
- * service} bean, using the local interface.
- * 
- * The body of the message must be a Java object that implements the {@link Command} interface.
- * The message properties, if any, are ignored.
- * 
- * <h3>Environment</h3>
- * 
- * <p>
- * The environment entries and resources available for customization are summarized in the table
- * below.
- * </p>
- * 
- * <table border="1">
- * <tr>
- * <th>Name</th>
- * <th>Type</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td><code>ejb/LocalCommandService</code></td>
- * <td>EJB Reference</td>
- * <td>Link to the local {@linkplain CommandServiceBean session bean} that executes commands on
- * a separate jBPM context.</td>
- * </tr>
- * <tr>
- * <td><code>jms/JbpmConnectionFactory</code></td>
- * <td>Resource Manager Reference</td>
- * <td>Logical name of the factory that provides JMS connections for producing result messages.
- * Required for command messages that indicate a reply destination.</td>
- * </tr>
- * </table>
- * 
- * @author Alejandro Guizar
- */
- at MessageDriven(activationConfig = {
-  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
-})
-public class CommandListenerBean implements MessageListener {
-
-  private static final long serialVersionUID = 1L;
-
-  @Resource
-  private MessageDrivenContext messageDrivenContext;
-  @EJB(name = "ejb/LocalCommandService")
-  private LocalCommandService commandService;
-  @Resource(name = "jms/JbpmConnectionFactory")
-  private ConnectionFactory jmsConnectionFactory;
-
-  private Connection jmsConnection;
-
-  private static final Log log = LogFactory.getLog(CommandListenerBean.class);
-
-  @PostConstruct
-  void createConnection() {
-    try {
-      jmsConnection = jmsConnectionFactory.createConnection();
-    }
-    catch (JMSException e) {
-      throw new JbpmException("failed to create jms connection", e);
-    }
-  }
-
-  @PreDestroy
-  void closeConnection() {
-    if (jmsConnection != null) {
-      try {
-        jmsConnection.close();
-      }
-      catch (JMSException e) {
-        log.debug("failed to close jms connection", e);
-      }
-    }
-  }
-
-  public void onMessage(Message message) {
-    try {
-      // extract command from message
-      Command command = extractCommand(message);
-      if (command == null) return;
-
-      // execute command via local command executor bean
-      Object result;
-      try {
-        result = commandService.execute(command);
-      }
-      catch (RuntimeException e) {
-        // MDBs are not supposed to throw exceptions
-        messageDrivenContext.setRollbackOnly();
-        // if this is a locking exception, keep it quiet
-        if (DbPersistenceService.isLockingException(e)) {
-          StaleObjectLogConfigurer.getStaleObjectExceptionsLog().error("failed to execute "
-            + command, e);
-        }
-        else {
-          log.error("failed to execute " + command, e);
-        }
-        // the exception becomes the result
-        result = e;
-      }
-      // send a response back if a "reply to" destination is set
-      Destination replyTo = message.getJMSReplyTo();
-      if (replyTo != null && (result instanceof Serializable || result == null)) {
-        sendResult((Serializable) result, replyTo, message.getJMSMessageID());
-      }
-    }
-    catch (JMSException e) {
-      messageDrivenContext.setRollbackOnly();
-      log.error("failed to process message " + message, e);
-    }
-  }
-
-  /**
-   * Retrieves a {@link Command} instance from the given message, which is assumed to be an
-   * {@link ObjectMessage}.
-   * <p>
-   * Subclasses may override this method to materialize the command in some other way.
-   * </p>
-   */
-  protected Command extractCommand(Message message) throws JMSException {
-    if (message instanceof ObjectMessage) {
-      ObjectMessage objectMessage = (ObjectMessage) message;
-      Serializable object = objectMessage.getObject();
-      if (object instanceof Command) {
-        return (Command) object;
-      }
-      else {
-        log.warn("not a command: " + object);
-      }
-    }
-    else {
-      log.warn("not an object message: " + message);
-    }
-    return null;
-  }
-
-  private void sendResult(Serializable result, Destination destination, String correlationId)
-    throws JMSException {
-    if (log.isDebugEnabled()) log.debug("sending " + result + " to " + destination);
-    Session jmsSession = createSession();
-    try {
-      Message resultMessage = jmsSession.createObjectMessage(result);
-      resultMessage.setJMSCorrelationID(correlationId);
-      jmsSession.createProducer(destination).send(resultMessage);
-    }
-    finally {
-      jmsSession.close();
-    }
-  }
-
-  private Session createSession() throws JMSException {
-    if (jmsConnection == null) {
-      jmsConnection = jmsConnectionFactory.createConnection();
-    }
-    /*
-     * if the connection supports xa, the session will be transacted, else the session will auto
-     * acknowledge; in either case no explicit transaction control must be performed - see ejb
-     * 2.1 - 17.3.5
-     */
-    return jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-  }
-}

Deleted: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/CommandServiceBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,140 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jbpm.ejb.impl;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import javax.annotation.Resources;
-import javax.ejb.SessionContext;
-import javax.ejb.Stateless;
-import javax.jms.ConnectionFactory;
-import javax.jms.Queue;
-import javax.sql.DataSource;
-
-import org.jbpm.JbpmConfiguration;
-import org.jbpm.JbpmContext;
-import org.jbpm.JbpmException;
-import org.jbpm.command.Command;
-import org.jbpm.ejb.LocalCommandService;
-import org.jbpm.jms.JmsConnectorServiceFactory;
-import org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory;
-
-/**
- * Stateless session bean that executes {@linkplain Command commands} by calling their
- * {@link Command#execute(JbpmContext) execute} method on a separate {@link JbpmContext jBPM
- * context}.
- * 
- * <h3>Environment</h3>
- * 
- * <p>
- * The environment entries and resources available for customization are summarized in the table
- * below.
- * </p>
- * 
- * <table border="1">
- * <tr>
- * <th>Name</th>
- * <th>Type</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td><code>JbpmCfgResource</code></td>
- * <td>Environment Entry</td>
- * <td>The classpath resource from which to read the {@linkplain JbpmConfiguration jBPM
- * configuration}. Optional, defaults to <code>
- * jbpm.cfg.xml</code>.</td>
- * </tr>
- * <tr>
- * <td><code>jdbc/JbpmDataSource</code></td>
- * <td>Resource Manager Reference</td>
- * <td>Logical name of the data source that provides JDBC connections to the
- * {@linkplain JtaDbPersistenceServiceFactory persistence service}. Must match the
- * <code>hibernate.connection.datasource</code> property in the Hibernate configuration file.</td>
- * </tr>
- * <tr>
- * <td><code>jms/JbpmConnectionFactory</code></td>
- * <td>Resource Manager Reference</td>
- * <td>Logical name of the factory that provides JMS connections to the
- * {@linkplain JmsConnectorServiceFactory JMS connector service}. Required for processes that
- * contain asynchronous continuations.</td>
- * </tr>
- * <tr>
- * <td><code>jms/JobQueue</code></td>
- * <td>Message Destination Reference</td>
- * <td>The message service sends job messages to the queue referenced here. To ensure this is
- * the same queue from which the {@linkplain JobListenerBean job listener bean} receives
- * messages, the <code>message-destination-link</code> points to a common logical destination,
- * <code>JobQueue</code>.</td>
- * </tr>
- * </table>
- * 
- * @author Alejandro Guizar
- */
- at Stateless
- at Resources(value = {
-  @Resource(name = "jdbc/JbpmDataSource", type = DataSource.class),
-  @Resource(name = "jms/JbpmConnectionFactory", type = ConnectionFactory.class),
-  @Resource(name = "jms/JobQueue", type = Queue.class)
-})
-public class CommandServiceBean implements LocalCommandService {
-
-  private static final long serialVersionUID = 1L;
-
-  @Resource
-  private SessionContext sessionContext;
-  @Resource(name = "JbpmCfgResource")
-  private String jbpmCfgResource;
-
-  private JbpmConfiguration jbpmConfiguration;
-
-  /**
-   * Creates the {@link JbpmConfiguration} to be used by this command service. In case the
-   * environment key <code>JbpmCfgResource</code> is specified, that value is interpreted as the
-   * name of the configuration resource to load from the classpath. If that key is absent, the
-   * default configuration file will be used (jbpm.cfg.xml).
-   */
-  @PostConstruct
-  void createConfiguration() {
-    jbpmConfiguration = JbpmConfiguration.getInstance(jbpmCfgResource);
-  }
-
-  public Object execute(Command command) {
-    JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
-    try {
-      Object result = command.execute(jbpmContext);
-      // check whether command requested a rollback
-      if (jbpmContext.getServices().getTxService().isRollbackOnly()) {
-        sessionContext.setRollbackOnly();
-      }
-      return result;
-    }
-    catch (RuntimeException e) {
-      throw e;
-    }
-    catch (Exception e) {
-      throw new JbpmException("failed to execute " + command, e);
-    }
-    finally {
-      jbpmContext.close();
-    }
-  }
-}

Deleted: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/ejb/impl/JobListenerBean.java	2010-09-28 09:17:11 UTC (rev 6687)
@@ -1,49 +0,0 @@
-package org.jbpm.ejb.impl;
-
-import javax.ejb.ActivationConfigProperty;
-import javax.ejb.MessageDriven;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.command.Command;
-import org.jbpm.jms.ExecuteJobCommand;
-
-/**
- * Message-driven bean that listens for {@link Message messages} containing a reference to a
- * pending {@linkplain org.jbpm.job.Job job} for asynchronous continuations.
- * <p>
- * The message must have a <code>long</code> property called <code>jobId</code> which identifies
- * a job in the database. The message body, if any, is ignored.
- * </p>
- * <h3>Environment</h3>
- * <p>
- * This bean inherits its environment entries and resources available for customization from
- * {@link CommandListenerBean}.
- * </p>
- * 
- * @author Alejandro Guizar
- */
- at MessageDriven(messageListenerInterface = MessageListener.class, activationConfig = {
-  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
-})
-public class JobListenerBean extends CommandListenerBean {
-
-  private static final long serialVersionUID = 1L;
-  private static final Log log = LogFactory.getLog(JobListenerBean.class);
-
-  protected Command extractCommand(Message message) throws JMSException {
-    // checking for jobId property
-    if (message.propertyExists("jobId")) {
-      long jobId = message.getLongProperty("jobId");
-      return new ExecuteJobCommand(jobId);
-    }
-    else {
-      log.warn("property jobId not found");
-    }
-    return null;
-  }
-}

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml	2010-09-27 21:40:46 UTC (rev 6686)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/resources/META-INF/ejb-jar.xml	2010-09-28 09:17:11 UTC (rev 6687)
@@ -28,19 +28,28 @@
 
       <message-destination-ref>
         <description>
-          The message service sends job messages to the queue referenced here. To ensure this is the
-          same queue from which the JobListenerBean receives messages, the message-destination-link
-          element points to a common logical destination, JobQueue.
+          The message service sends job messages to the queue referenced here.
+          Must be the same queue from which the JobListenerBean receives messages.
         </description>
         <message-destination-ref-name>jms/JobQueue</message-destination-ref-name>
         <message-destination-usage>Produces</message-destination-usage>
         <mapped-name>queue/JbpmJobQueue</mapped-name>
       </message-destination-ref>
+
+      <message-destination-ref>
+        <description>
+          The scheduler service sends timer messages to the queue referenced here.
+          Must be the same queue from which the TimerListenerBean receives messages.
+        </description>
+        <message-destination-ref-name>jms/TimerQueue</message-destination-ref-name>
+        <message-destination-usage>Produces</message-destination-usage>
+        <mapped-name>queue/JbpmTimerQueue</mapped-name>
+      </message-destination-ref>
     </session>
 
     <message-driven>
       <description>
-        Listens for serialized commands and routes them to the command service session bean.
+        Listens for serialized commands and runs them through the command service.
       </description>
       <display-name>jBPM Command Listener</display-name>
       <ejb-name>CommandListenerBean</ejb-name>
@@ -48,7 +57,7 @@
 
       <resource-ref>
         <description>
-          Logical name of the factory that provides JMS connections for producing result messages.
+          Logical name of the factory that provides JMS connections for sending result messages.
           Required for command messages that indicate a reply destination.
         </description>
         <res-ref-name>jms/JbpmConnectionFactory</res-ref-name>
@@ -58,21 +67,44 @@
 
     <message-driven>
       <description>
-        Listens for job references and delegates execution of the referenced Job to the command
-        service session bean.
+        Listens for async job references and runs the jobs through the command service.
       </description>
-      <display-name>jBPM Job Listener</display-name>
-      <ejb-name>JobListenerBean</ejb-name>
+      <display-name>jBPM Async Job Listener</display-name>
+      <ejb-name>AsyncJobListenerBean</ejb-name>
       <mapped-name>queue/JbpmJobQueue</mapped-name>
+      <ejb-class>org.jbpm.ejb.JobListenerBean</ejb-class>
+      <messaging-type>javax.jms.MessageListener</messaging-type>
+      <message-destination-type>javax.jms.Queue</message-destination-type>
 
       <resource-ref>
         <description>
-          Logical name of the factory that provides JMS connections for producing result messages.
+          Logical name of the factory that provides JMS connections for sending result messages.
           Required for command messages that indicate a reply destination.
         </description>
         <res-ref-name>jms/JbpmConnectionFactory</res-ref-name>
         <mapped-name>java:JmsXA</mapped-name>
       </resource-ref>
     </message-driven>
+
+    <message-driven>
+      <description>
+        Listens for timer references and runs the timers through the command service.
+      </description>
+      <display-name>jBPM Timer Listener</display-name>
+      <ejb-name>TimerListenerBean</ejb-name>
+      <mapped-name>queue/JbpmTimerQueue</mapped-name>
+      <ejb-class>org.jbpm.ejb.JobListenerBean</ejb-class>
+      <messaging-type>javax.jms.MessageListener</messaging-type>
+      <message-destination-type>javax.jms.Queue</message-destination-type>
+
+      <resource-ref>
+        <description>
+          Logical name of the factory that provides JMS connections for sending result messages.
+          Required for command messages that indicate a reply destination.
+        </description>
+        <res-ref-name>jms/JbpmConnectionFactory</res-ref-name>
+        <mapped-name>java:JmsXA</mapped-name>
+      </resource-ref>
+    </message-driven>
   </enterprise-beans>
 </ejb-jar>



More information about the jbpm-commits mailing list