[jbpm-commits] JBoss JBPM SVN: r5441 - in jbpm3/branches/jbpm-3.2-soa/modules: core/src/main/java/org/jbpm/persistence/db and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 7 04:54:14 EDT 2009


Author: alex.guizar at jboss.com
Date: 2009-08-07 04:54:13 -0400 (Fri, 07 Aug 2009)
New Revision: 5441

Modified:
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
   jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
   jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
   jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
   jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
   jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
   jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java
   jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml
Log:
[JBPM-2472] Make JMS session creation configurable

Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -31,38 +31,34 @@
 import org.jbpm.msg.MessageService;
 
 public class DbMessageService implements MessageService {
-  
+
   private static final long serialVersionUID = 1L;
 
-  JobSession jobSession = null;
-  JobExecutor jobExecutor = null;
-  boolean hasProducedJobs = false;
-  
+  final JobSession jobSession;
+  final JobExecutor jobExecutor;
+  boolean hasProducedJobs;
+
   public DbMessageService() {
     JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
-    if (jbpmContext==null) {
-      throw new JbpmException("instantiation of the DbMessageService requires a current JbpmContext");
-    }
+    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+
     jobSession = jbpmContext.getJobSession();
     jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
   }
 
   public void send(Job job) {
     jobSession.saveJob(job);
-    log.debug("saved "+job);
     hasProducedJobs = true;
   }
 
   public void close() {
-    if ( (hasProducedJobs)
-         && (jobExecutor!=null)
-       ) {
-      log.debug("messages were produced, job executor will be signalled");
-      synchronized(jobExecutor) {
+    if (hasProducedJobs && jobExecutor != null) {
+      log.debug("messages were produced, job executor will be notified");
+      synchronized (jobExecutor) {
         jobExecutor.notify();
       }
     }
   }
-  
+
   private static Log log = LogFactory.getLog(DbMessageService.class);
 }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -58,7 +58,7 @@
   protected boolean mustConnectionBeClosed;
 
   protected Transaction transaction;
-  protected boolean isTransactionEnabled = true;
+  protected boolean isTransactionEnabled;
   protected boolean isCurrentSessionEnabled;
 
   protected Session session;
@@ -156,10 +156,11 @@
 
   public Connection getConnection(boolean resolveSession) {
     if (connection == null) {
-      if (persistenceServiceFactory.getDataSource() != null) {
+      DataSource dataSource = persistenceServiceFactory.getDataSource();
+      if (dataSource != null) {
         try {
-          log.debug("establishing connneciton to data source");
-          connection = persistenceServiceFactory.getDataSource().getConnection();
+          log.debug("establishing connection to data source");
+          connection = dataSource.getConnection();
           mustConnectionBeClosed = true;
         }
         catch (SQLException e) {
@@ -257,7 +258,7 @@
         session.flush();
       }
       catch (HibernateException e) {
-        // avoid log and throw antipatternf
+        // avoid log and throw antipattern
         return e;
       }
     }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -30,14 +30,13 @@
 import org.jbpm.svc.Service;
 
 /**
- * The JTA persistence service enables jBPM to participate in JTA transactions.
- * If an existing transaction is underway, {@link JtaDbPersistenceService} 
- * clings to it; otherwise it starts a new transaction.
+ * The JTA persistence service enables jBPM to participate in JTA transactions. If an existing
+ * transaction is underway, {@link JtaDbPersistenceService} clings to it; otherwise it starts a new
+ * transaction.
  * 
  * <h3>Configuration</h3>
  * 
- * The JTA persistence service factory has the configurable fields described
- * below.
+ * The JTA persistence service factory has the configurable fields described below.
  * 
  * <ul>
  * <li><code>isCurrentSessionEnabled</code></li>
@@ -47,46 +46,44 @@
  * Refer to the jBPM manual for details.
  * 
  * @author Tom Baeyens
+ * @author Alejandro Guizar
  */
-public class JtaDbPersistenceServiceFactory extends DbPersistenceServiceFactory
-{
+public class JtaDbPersistenceServiceFactory extends DbPersistenceServiceFactory {
 
   private static final long serialVersionUID = 1L;
 
   private UserTransaction userTransaction;
 
-  public JtaDbPersistenceServiceFactory()
-  {
+  public JtaDbPersistenceServiceFactory() {
     setCurrentSessionEnabled(true);
     setTransactionEnabled(false);
   }
 
-  public Service openService()
-  {
+  public Service openService() {
     return new JtaDbPersistenceService(this);
   }
 
-  public UserTransaction getUserTransaction()
-  {
-    if (userTransaction == null)
-    {
+  public UserTransaction getUserTransaction() {
+    if (userTransaction == null) {
       String jndiName = getConfiguration().getProperty("jta.UserTransaction");
-      if (jndiName == null)
-      {
+      if (jndiName == null) {
         /*
-         * EJB 2.1 section 20.9 The container must make the UserTransaction interface available to the enterprise beans that are allowed to use this interface (only
-         * session and message- driven beans with bean-managed transaction demarcation are allowed to use this interface) in JNDI under the name
-         * java:comp/UserTransaction. J2EE 1.4 section 4.2.1.1 The J2EE platform must provide an object implementing the UserTransaction interface to all web
-         * components. The platform must publish the UserTransaction object in JNDI under the name java:comp/UserTransaction.
+         * EJB 2.1 section 20.9 The container must make the UserTransaction interface available to
+         * the enterprise beans that are allowed to use this interface (only session and message-
+         * driven beans with bean-managed transaction demarcation are allowed to use this interface)
+         * in JNDI under the name java:comp/UserTransaction.
          */
+        /*
+         * J2EE 1.4 section 4.2.1.1 The J2EE platform must provide an object implementing the
+         * UserTransaction interface to all web components. The platform must publish the
+         * UserTransaction object in JNDI under the name java:comp/UserTransaction.
+         */
         jndiName = "java:comp/UserTransaction";
       }
-      try
-      {
-        userTransaction = (UserTransaction)new InitialContext().lookup(jndiName);
+      try {
+        userTransaction = (UserTransaction) new InitialContext().lookup(jndiName);
       }
-      catch (NamingException e)
-      {
+      catch (NamingException e) {
         throw new JbpmException("could not retrieve user transaction with name " + jndiName, e);
       }
     }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -37,20 +37,19 @@
   private static final long serialVersionUID = 1L;
 
   private static final Log log = LogFactory.getLog(DbSchedulerService.class);
-  
-  JobSession jobSession = null;
-  JobExecutor jobExecutor = null;
-  boolean hasProducedJobs = false;
 
+  final JobSession jobSession;
+  final JobExecutor jobExecutor;
+  boolean hasProducedJobs;
+
   public DbSchedulerService() {
     JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
-    if (jbpmContext==null) {
-      throw new JbpmException("instantiation of the DbSchedulerService requires a current JbpmContext");
-    }
+    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+
     this.jobSession = jbpmContext.getJobSession();
     this.jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
   }
-  
+
   public void createTimer(Timer timerJob) {
     jobSession.saveJob(timerJob);
     hasProducedJobs = true;
@@ -70,7 +69,7 @@
 
   public void close() {
     if (hasProducedJobs && jobExecutor != null) {
-      log.debug("timers were produced, job executor will be signalled");
+      log.debug("timers were produced, job executor will be notified");
       synchronized (jobExecutor) {
         jobExecutor.notify();
       }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -46,18 +46,19 @@
 import org.jbpm.ejb.LocalCommandServiceHome;
 
 /**
- * This message-driven bean listens for {@link ObjectMessage object messages}
- * containing a command instance. The received commands are 
- * executed by the {@link CommandServiceBean command service} bean, using the
- * local interface.
+ * This message-driven bean listens for {@link ObjectMessage object messages} containing a command
+ * instance. The received commands are executed by the {@link CommandServiceBean command service}
+ * bean, using the local interface.
  * 
- * The body of the message must be a Java object that implements the {@link 
- * Command} interface. The message properties, if any, are ignored.
+ * The body of the message must be a Java object that implements the {@link Command} interface. The
+ * message properties, if any, are ignored.
  * 
  * <h3>Environment</h3>
  * 
- * <p>The environment entries and resources available for customization are
- * summarized in the table below.</p>
+ * <p>
+ * The environment entries and resources available for customization are summarized in the table
+ * below.
+ * </p>
  * 
  * <table border="1">
  * <tr>
@@ -68,50 +69,41 @@
  * <tr>
  * <td><code>ejb/LocalCommandServiceBean</code></td>
  * <td>EJB Reference</td>
- * <td>Link to the local {@linkplain CommandServiceBean session bean} that
- * executes commands on a separate jBPM context.
- * </td>
+ * <td>Link to the local {@linkplain CommandServiceBean session bean} that executes commands on a
+ * separate jBPM context.</td>
  * </tr>
  * <tr>
  * <td><code>jms/JbpmConnectionFactory</code></td>
  * <td>Resource Manager Reference</td>
- * <td>Logical name of the factory that provides JMS connections for producing
- * result messages. Required for command messages that indicate a reply
- * destination.
- * </td>
+ * <td>Logical name of the factory that provides JMS connections for producing result messages.
+ * Required for command messages that indicate a reply destination.</td>
  * </tr>
  * <tr>
  * <td><code>jms/DeadLetterQueue</code></td>
  * <td>Message Destination Reference</td>
- * <td>Messages which do not contain a command are sent to the queue referenced
- * here. Optional; if absent, such messages are rejected, which may cause the 
- * container to redeliver.
- * </td>
+ * <td>Messages which do not contain a command are sent to the queue referenced here. Optional; if
+ * absent, such messages are rejected, which may cause the container to redeliver.</td>
  * </tr>
  * </table>
  * 
  * @author Jim Rigsbee
- * @author Tom Baeyens
  * @author Alejandro Guizar
+ * @author Tom Baeyens
  */
-public class CommandListenerBean implements MessageDrivenBean, MessageListener
-{
+public class CommandListenerBean implements MessageDrivenBean, MessageListener {
 
   private static final long serialVersionUID = 1L;
 
-  MessageDrivenContext messageDrivenContext = null;
+  MessageDrivenContext messageDrivenContext;
   LocalCommandService commandService;
   Connection jmsConnection;
   Destination deadLetterQueue;
 
-  public void onMessage(Message message)
-  {
-    try
-    {
+  public void onMessage(Message message) {
+    try {
       // extract command from message
       Command command = extractCommand(message);
-      if (command == null)
-      {
+      if (command == null) {
         discard(message);
         return;
       }
@@ -119,54 +111,42 @@
       Object result = commandService.execute(command);
       // send a response back if a "reply to" destination is set
       Destination replyTo = message.getJMSReplyTo();
-      if (replyTo != null && (result instanceof Serializable || result == null))
-      {
-        sendResult((Serializable)result, replyTo, message.getJMSMessageID());
+      if (replyTo != null && (result instanceof Serializable || result == null)) {
+        sendResult((Serializable) result, replyTo, message.getJMSMessageID());
       }
     }
-    catch (JMSException e)
-    {
+    catch (JMSException e) {
       messageDrivenContext.setRollbackOnly();
       log.error("could not process message " + message, e);
     }
   }
 
-  protected Command extractCommand(Message message) throws JMSException
-  {
-    Command command = null;
-    if (message instanceof ObjectMessage)
-    {
+  protected Command extractCommand(Message message) throws JMSException {
+    if (message instanceof ObjectMessage) {
       log.debug("deserializing command from jms message...");
-      ObjectMessage objectMessage = (ObjectMessage)message;
+      ObjectMessage objectMessage = (ObjectMessage) message;
       Serializable object = objectMessage.getObject();
-      if (object instanceof Command)
-      {
-        command = (Command)object;
+      if (object instanceof Command) {
+        return (Command) object;
       }
-      else
-      {
-        log.warn("ignoring object message cause it isn't a command '" + object + "'" + (object != null ? " (" + object.getClass().getName() + ")" : ""));
+      else {
+        log.warn("not a command: " + object);
       }
     }
-    else
-    {
-      log.warn("ignoring message '" + message + "' cause it isn't an ObjectMessage (" + message.getClass().getName() + ")");
+    else {
+      log.warn("not an object message: " + message);
     }
-    return command;
+    return null;
   }
 
-  private void discard(Message message) throws JMSException
-  {
-    if (deadLetterQueue == null)
-    {
+  private void discard(Message message) throws JMSException {
+    if (deadLetterQueue == null) {
       // lookup dead letter queue
-      try
-      {
+      try {
         Context initial = new InitialContext();
-        deadLetterQueue = (Destination)initial.lookup("java:comp/env/jms/DeadLetterQueue");
+        deadLetterQueue = (Destination) initial.lookup("java:comp/env/jms/DeadLetterQueue");
       }
-      catch (NamingException e)
-      {
+      catch (NamingException e) {
         log.debug("failed to retrieve dead letter queue, rejecting message: " + message);
         messageDrivenContext.setRollbackOnly();
         return;
@@ -174,70 +154,58 @@
     }
     // send message to dead letter queue
     Session jmsSession = createSession();
-    try
-    {
+    try {
       jmsSession.createProducer(deadLetterQueue).send(message);
     }
-    finally
-    {
+    finally {
       jmsSession.close();
     }
   }
 
-  private void sendResult(Serializable result, Destination destination, String correlationId) throws JMSException
-  {
+  private void sendResult(Serializable result, Destination destination, String correlationId)
+      throws JMSException {
     log.debug("sending result " + result + " to " + destination);
     Session jmsSession = createSession();
-    try
-    {
+    try {
       Message resultMessage = jmsSession.createObjectMessage(result);
       resultMessage.setJMSCorrelationID(correlationId);
       jmsSession.createProducer(destination).send(resultMessage);
     }
-    finally
-    {
+    finally {
       jmsSession.close();
     }
   }
 
-  private Session createSession() throws JMSException
-  {
-    if (jmsConnection == null)
-    {
+  private Session createSession() throws JMSException {
+    if (jmsConnection == null) {
       // lookup factory and create jms connection
-      try
-      {
+      try {
         Context initial = new InitialContext();
-        ConnectionFactory jmsConnectionFactory = (ConnectionFactory)initial.lookup("java:comp/env/jms/JbpmConnectionFactory");
+        ConnectionFactory jmsConnectionFactory = (ConnectionFactory) initial.lookup("java:comp/env/jms/JbpmConnectionFactory");
         jmsConnection = jmsConnectionFactory.createConnection();
       }
-      catch (NamingException e)
-      {
+      catch (NamingException e) {
         throw new EJBException("error retrieving jms connection factory", e);
       }
     }
     /*
-     * if the connection supports xa, the session will be transacted, else the session will auto acknowledge; in either case no explicit transaction control must be
-     * performed - see ejb 2.1 - 17.3.5
+     * if the connection supports xa, the session will be transacted, else the session will auto
+     * acknowledge; in either case no explicit transaction control must be performed - see ejb 2.1 -
+     * 17.3.5
      */
     return jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   }
 
-  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext)
-  {
+  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
     this.messageDrivenContext = messageDrivenContext;
   }
 
-  public void ejbRemove()
-  {
-    if (jmsConnection != null)
-    {
-      try
-      {
+  public void ejbRemove() {
+    if (jmsConnection != null) {
+      try {
         jmsConnection.close();
       }
-      catch (JMSException e)
-      {
+      catch (JMSException e) {
         log.debug("failed to close jms connection", e);
       }
       jmsConnection = null;
@@ -247,20 +215,16 @@
     messageDrivenContext = null;
   }
 
-  public void ejbCreate()
-  {
-    try
-    {
+  public void ejbCreate() {
+    try {
       Context initial = new InitialContext();
-      LocalCommandServiceHome commandServiceHome = (LocalCommandServiceHome)initial.lookup("java:comp/env/ejb/LocalCommandServiceBean");
+      LocalCommandServiceHome commandServiceHome = (LocalCommandServiceHome) initial.lookup("java:comp/env/ejb/LocalCommandServiceBean");
       commandService = commandServiceHome.create();
     }
-    catch (NamingException e)
-    {
+    catch (NamingException e) {
       throw new EJBException("error retrieving command service home", e);
     }
-    catch (CreateException e)
-    {
+    catch (CreateException e) {
       throw new EJBException("error creating command service", e);
     }
   }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -28,119 +28,136 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
 import org.jbpm.db.JobSession;
+import org.jbpm.graph.exe.Token;
 import org.jbpm.job.Job;
 import org.jbpm.msg.MessageService;
+import org.jbpm.svc.Services;
+import org.jbpm.taskmgmt.exe.TaskInstance;
+import org.jbpm.tx.TxService;
 
 public class JmsMessageService implements MessageService {
 
   private static final long serialVersionUID = 1L;
 
-  JobSession jobSession = null;
-  Connection connection = null;
-  Session session = null;
-  Destination destination = null;
-  MessageProducer messageProducer = null;
-  boolean isCommitEnabled = false;
+  private static final Log log = LogFactory.getLog(JmsMessageService.class);
 
-  public JmsMessageService(Connection connection, Destination destination, boolean isCommitEnabled) throws JMSException {
+  final JobSession jobSession;
+
+  final Connection connection;
+  final Session session;
+  final MessageProducer messageProducer;
+
+  final boolean isCommitEnabled;
+
+  /**
+   * @deprecated use {@link #JmsMessageService(JmsMessageServiceFactory)} instead
+   */
+  public JmsMessageService(Connection connection, Destination destination, boolean isCommitEnabled)
+      throws JMSException {
     JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
-    if (jbpmContext==null) {
-      throw new JbpmException("jms message service must be created inside a jbpm context");
+    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+    jobSession = jbpmContext.getJobSession();
+
+    this.connection = connection;
+
+    if (isCommitEnabled) {
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      this.isCommitEnabled = true;
     }
-    this.jobSession = jbpmContext.getJobSession();
+    else {
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      this.isCommitEnabled = false;
+    }
 
-    this.connection = connection;
-    this.destination = destination;
-    this.isCommitEnabled = isCommitEnabled;
-    /* 
-     * If the connection supports XA, the session will always take part in the global transaction.
-     * Otherwise the first parameter specifies whether message productions and consumptions 
-     * are part of a single transaction (TRUE) or performed immediately (FALSE).
-     * Messages are never meant to be received before the database transaction commits,
-     * hence the transacted is preferable.
-     */
-    session = connection.createSession(true, Session.SESSION_TRANSACTED);
+    messageProducer = session.createProducer(destination);
   }
 
+  public JmsMessageService(JmsMessageServiceFactory factory) throws JMSException {
+    this(factory.getConnectionFactory().createConnection(), factory.getDestination(),
+        factory.isCommitEnabled());
+  }
+
   public void send(Job job) {
+    jobSession.saveJob(job);
+
     try {
-      jobSession.saveJob(job);
-      
       Message message = session.createMessage();
       message.setLongProperty("jobId", job.getId());
-      if (job.getToken()!=null) {
-        message.setLongProperty("tokenId", job.getToken().getId());
-      }
-      if (job.getProcessInstance()!=null) {
+
+      Token token = job.getToken();
+      if (token != null) {
+        message.setLongProperty("tokenId", token.getId());
         message.setLongProperty("processInstanceId", job.getProcessInstance().getId());
       }
-      if (job.getTaskInstance()!=null) {
-        message.setLongProperty("taskInstanceId", job.getTaskInstance().getId());
+
+      TaskInstance taskInstance = job.getTaskInstance();
+      if (taskInstance != null) {
+        message.setLongProperty("taskInstanceId", taskInstance.getId());
       }
+
       modifyMessage(message, job);
-      getMessageProducer().send(message);
-    } catch (JMSException e) {
-      throw new JbpmException("couldn't send jms message", e);
+      messageProducer.send(message);
     }
+    catch (JMSException e) {
+      throw new JbpmException("could not send jms message", e);
+    }
   }
 
   /**
-   * Hook to modify the message, e.g. adding additional properties
-   * to the header required by the own application. One possible 
-   * use case is to rescue the actor id over the "JMS" intermezzo
-   * of asynchronous continuations.
+   * Hook to modify the message, e.g. adding extra properties to the header required by the own
+   * application. One possible use case is to rescue the actor id over the "JMS" intermezzo of
+   * asynchronous continuations.
    */
-  public void modifyMessage(Message message, Job job) throws JMSException {
+  protected void modifyMessage(Message message, Job job) throws JMSException {
   }
 
   public void close() {
-    JbpmException exception = null;
+    try {
+      messageProducer.close();
+    }
+    catch (JMSException e) {
+      log.warn("could not close message producer", e);
+    }
 
-    if (messageProducer!=null) {
+    JMSException commitException = null;
+    if (isCommitEnabled) {
+      TxService txService = (TxService) Services.getCurrentService(Services.SERVICENAME_TX);
       try {
-        messageProducer.close();
-      } catch (Exception e) {
-        // NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
-        exception = new JbpmException("couldn't close message producer", e);
-      }
-    }
-
-    if (session!=null) {
-      if (isCommitEnabled) {
-        try {
+        if (txService.isRollbackOnly()) {
+          session.rollback();
+        }
+        else {
           session.commit();
-        } catch (Exception e) {
-          if (exception==null) {
-            exception = new JbpmException("couldn't commit JMS session", e);
-          }
         }
       }
-      
-      try {
-        session.close();
-      } catch (Exception e) {
-        if (exception==null) {
-          exception = new JbpmException("couldn't close JMS session", e);
-        }
+      catch (JMSException e) {
+        commitException = e;
       }
     }
 
-    if (connection!=null) {
-      try {
-        connection.close();
-      } catch (Exception e) {
-        if (exception==null) {
-          exception = new JbpmException("couldn't close JMS connection", e);
-        }
-      }
+    try {
+      session.close();
     }
+    catch (JMSException e) {
+      log.warn("could not close jms session", e);
+    }
 
-    if (exception!=null) {
-      throw exception;
+    try {
+      connection.close();
     }
+    catch (JMSException e) {
+      log.warn("could not close jms connection", e);
+    }
+
+    if (commitException != null) {
+      throw new JbpmException("could not commit jms session", commitException);
+    }
   }
 
   public Session getSession() {
@@ -148,9 +165,6 @@
   }
 
   protected MessageProducer getMessageProducer() throws JMSException {
-    if (messageProducer==null) {
-      messageProducer = session.createProducer(destination);
-    }
     return messageProducer;
   }
 }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -21,7 +21,6 @@
  */
 package org.jbpm.msg.jms;
 
-import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -35,9 +34,8 @@
 import org.jbpm.svc.ServiceFactory;
 
 /**
- * The JMS message service leverages the reliable communication infrastructure
- * accessed through JMS interfaces to deliver asynchronous continuation
- * messages to the {@link JobListenerBean}.
+ * The JMS message service leverages the reliable communication infrastructure available through JMS
+ * interfaces to deliver asynchronous continuation messages to the {@link JobListenerBean}.
  * 
  * <h3>Configuration</h3>
  * 
@@ -54,103 +52,68 @@
  * @author Tom Baeyens
  * @author Alejandro Guizar
  */
-public class JmsMessageServiceFactory implements ServiceFactory
-{
+public class JmsMessageServiceFactory implements ServiceFactory {
+
   private static final long serialVersionUID = 1L;
 
   String connectionFactoryJndiName = "java:comp/env/jms/JbpmConnectionFactory";
   String destinationJndiName = "java:comp/env/jms/JobQueue";
-  String commandDestinationJndiName = "java:comp/env/jms/CommandQueue";
-  boolean isCommitEnabled = false;
 
+  boolean isCommitEnabled;
+
   private ConnectionFactory connectionFactory;
   private Destination destination;
-  private Destination commandDestination;
 
-  public ConnectionFactory getConnectionFactory()
-  {
-    if (connectionFactory == null)
-    {
-      try
-      {
-        connectionFactory = (ConnectionFactory)lookup(connectionFactoryJndiName);
+  public ConnectionFactory getConnectionFactory() {
+    if (connectionFactory == null) {
+      try {
+        connectionFactory = (ConnectionFactory) lookup(connectionFactoryJndiName);
       }
-      catch (NamingException e)
-      {
+      catch (NamingException e) {
         throw new JbpmException("could not retrieve message connection factory", e);
       }
     }
     return connectionFactory;
   }
 
-  public Destination getDestination()
-  {
-    if (destination == null)
-    {
-      try
-      {
-        destination = (Destination)lookup(destinationJndiName);
+  public Destination getDestination() {
+    if (destination == null) {
+      try {
+        destination = (Destination) lookup(destinationJndiName);
       }
-      catch (NamingException e)
-      {
+      catch (NamingException e) {
         throw new JbpmException("could not retrieve job destination", e);
       }
     }
     return destination;
   }
 
-  public Destination getCommandDestination()
-  {
-    if (commandDestination == null)
-    {
-      try
-      {
-        commandDestination = (Destination)lookup(commandDestinationJndiName);
-      }
-      catch (NamingException e)
-      {
-        throw new JbpmException("could not retrieve command destination", e);
-      }
-    }
-    return commandDestination;
-  }
-
-  public boolean isCommitEnabled()
-  {
+  public boolean isCommitEnabled() {
     return isCommitEnabled;
   }
 
-  private static Object lookup(String name) throws NamingException
-  {
+  private static Object lookup(String name) throws NamingException {
     Context initial = new InitialContext();
-    try
-    {
+    try {
       return initial.lookup(name);
     }
-    finally
-    {
+    finally {
       initial.close();
     }
   }
 
-  public Service openService()
-  {
-    try
-    {
-      Connection connection = getConnectionFactory().createConnection();
-      return new JmsMessageService(connection, getDestination(), isCommitEnabled);
+  public Service openService() {
+    try {
+      return new JmsMessageService(this);
     }
-    catch (JMSException e)
-    {
-      throw new JbpmException("couldn't open message session", e);
+    catch (JMSException e) {
+      throw new JbpmException("could not open message service", e);
     }
   }
 
-  public void close()
-  {
+  public void close() {
     connectionFactory = null;
     destination = null;
-    commandDestination = null;
   }
 
 }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -28,8 +28,8 @@
 import org.jbpm.svc.Service;
 
 /**
- * Alias for {@link JmsMessageServiceFactory}, supplied for compatibility
- * with the jBPM 3.2 series.
+ * Alias for {@link JmsMessageServiceFactory}, supplied for compatibility with jBPM 3.2.2.
+ * 
  * @deprecated replaced by {@link JmsMessageServiceFactory}
  */
 public class JmsMessageServiceFactoryImpl extends JmsMessageServiceFactory {
@@ -40,10 +40,10 @@
     try {
       Connection connection = getConnectionFactory().createConnection();
       return new JmsMessageServiceImpl(connection, getDestination(), isCommitEnabled);
-    } 
+    }
     catch (JMSException e) {
       throw new JbpmException("couldn't open message session", e);
-    } 
+    }
   }
 
 }

Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java	2009-08-07 08:54:13 UTC (rev 5441)
@@ -26,15 +26,16 @@
 import javax.jms.JMSException;
 
 /**
- * Alias for {@link JmsMessageService}, supplied for compatibility with the
- * jBPM 3.2 series.
+ * Alias for {@link JmsMessageService}, supplied for compatibility with jBPM 3.2.2.
+ * 
  * @deprecated replaced by {@link JmsMessageService}
  */
 public class JmsMessageServiceImpl extends JmsMessageService {
 
   private static final long serialVersionUID = 1L;
-  
-  public JmsMessageServiceImpl(Connection connection, Destination destination, boolean isCommitEnabled) throws JMSException {
+
+  public JmsMessageServiceImpl(Connection connection, Destination destination,
+      boolean isCommitEnabled) throws JMSException {
     super(connection, destination, isCommitEnabled);
   }
 

Modified: jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml	2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml	2009-08-07 08:54:13 UTC (rev 5441)
@@ -261,24 +261,35 @@
 
     <itemizedlist>
 
-      <listitem><literal>connectionFactoryJndiName</literal>: the name of the JMS connection factory
-      in the JNDI initial context. Defaults to <literal>java:comp/env/jms/JbpmConnectionFactory</literal>.
+      <listitem><literal>connectionFactoryJndiName</literal>: the JNDI name of the JMS connection
+      factory. Defaults to <literal>java:comp/env/jms/JbpmConnectionFactory</literal>.
       </listitem>
 
-      <listitem><literal>destinationJndiName</literal>: the name of the JMS destination where job 
-      messages will be sent. Must match the destination from which <literal>JobListenerBean</literal>
+      <listitem><literal>destinationJndiName</literal>: the JNDI name of the JMS destination where
+      job messages are sent. Must match the destination where <literal>JobListenerBean</literal>
       receives messages. Defaults to <literal>java:comp/env/jms/JobQueue</literal>.
       </listitem>
 
-      <listitem><literal>isCommitEnabled</literal>: tells whether jBPM should commit the JMS session
-      upon <literal>JbpmContext.close()</literal>. Messages produced by the JMS message service are 
-      never meant to be received before the current transaction commits; hence the JMS sessions 
-      created by the service are always transacted. The default value -<literal>false</literal>- is
-      appropriate when the connection factory in use is XA capable, as the JMS session's produced 
-      messages will be controlled by the overall JTA transaction. This field should be set to 
-      <literal>true</literal> if the JMS connection factory is not XA capable so that jBPM commits
-      the JMS session's local transaction explicitly.
+      <listitem><literal>isCommitEnabled</literal>: tells whether the message service should
+      create a transacted session and either commit or rollback on close. Messages produced by the
+      JMS message service are never meant to be received before the database transaction commits.
+      The <ulink url="http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS7.html#wp92011">J2EE
+      tutorial</ulink> states "when you create a session in an enterprise bean, the container
+      ignores the arguments you specify, because it manages all transactional properties for
+      enterprise beans". Unfortunately the tutorial fails to indicate that said behavior is not 
+      prescriptive. JBoss ignores the <literal>transacted</literal> argument if the 
+      connection factory supports XA, since the overall JTA transaction controls the session.
+      Otherwise, <literal>transacted</literal> produces a locally transacted session. In <ulink 
+      url="http://e-docs.bea.com/wls/docs103/jms/trans.html">Weblogic</ulink>, JMS transacted
+      sessions are agnostic to JTA transactions even if the connection factory is XA enabled.
+      With <literal>isCommitEnabled</literal> set to <literal>false</literal> (the default),
+      the message service creates a nontransacted, auto-acknowledging session. Such a session
+      works with containers that either disregard the creation arguments or do not bind
+      transacted sessions to JTA. Conversely, setting <literal>isCommitEnabled</literal> to 
+      <literal>true</literal> causes the message service to create a transacted session and commit
+      or rollback according to the <literal>TxService.isRollbackOnly</literal> method.
       </listitem>
+
     </itemizedlist>
 
     <para><literal>EntitySchedulerServiceFactory</literal> builds on the transactional notification



More information about the jbpm-commits mailing list