[jbpm-commits] JBoss JBPM SVN: r6838 - in jbpm3/branches/jbpm-3.2-soa: enterprise/src/main/java/org/jbpm/scheduler/ejbtimer and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 23 08:45:17 EST 2010


Author: alex.guizar at jboss.com
Date: 2010-11-23 08:45:16 -0500 (Tue, 23 Nov 2010)
New Revision: 6838

Modified:
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java
   jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java
   jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
   jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
   jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
   jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java
   jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java
Log:
reference as few objects as possible in jms message/connector and ejb/entity scheduler services

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -28,84 +28,82 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
 import org.jbpm.db.JobSession;
-import org.jbpm.graph.exe.Token;
 import org.jbpm.job.Job;
 import org.jbpm.msg.MessageService;
 import org.jbpm.svc.Services;
-import org.jbpm.taskmgmt.exe.TaskInstance;
-import org.jbpm.tx.TxService;
 
 public class JmsMessageService implements MessageService {
 
-  private static final long serialVersionUID = 1L;
+  private final JmsMessageServiceFactory serviceFactory;
 
-  private static final Log log = LogFactory.getLog(JmsMessageService.class);
+  private final Connection connection;
+  private final Session session;
 
-  final JobSession jobSession;
+  private static final long serialVersionUID = 2L;
+  private static final String GROUP_ID_PROP = "JMSXGroupID";
+  private static final String GROUP_PREFIX = "jBPMPID";
 
-  final Connection connection;
-  final Session session;
-  final MessageProducer messageProducer;
+  /** @deprecated use {@link #JmsMessageService(JmsMessageServiceFactory)} instead */
+  public JmsMessageService(Connection connection, Destination destination, boolean commitEnabled)
+    throws JMSException {
+    this.connection = connection;
+    session = createSession(connection);
+    serviceFactory = (JmsMessageServiceFactory) Services.getCurrentService(Services.SERVICENAME_MESSAGE);
+  }
 
-  final boolean isCommitEnabled;
+  public JmsMessageService(JmsMessageServiceFactory serviceFactory) throws JMSException {
+    connection = serviceFactory.getConnectionFactory().createConnection();
+    session = createSession(connection);
+    this.serviceFactory = serviceFactory;
+  }
 
   /**
-   * @deprecated use {@link #JmsMessageService(JmsMessageServiceFactory)} instead
+   * EJB 2.1 section 17.3.5 Because the container manages the transactional enlistment of JMS
+   * sessions on behalf of a bean, the parameters of the
+   * {@link Connection#createSession(boolean, int) createSession} method are ignored. It is
+   * recommended that the Bean Provider specify that a session is transacted, but provide
+   * <code>0</code> for the value of the acknowledgment mode.
+   * <p>
+   * Nonetheless, in <a href="http://publib.boulder.ibm.com/infocenter/adiehelp/v5r1m1/topic/com.ibm.wasee.doc/info/ee/ae/tmj_ep.html"
+   * >WebSphere</a>, if the transacted flag is set to true outside of a transaction, the
+   * application should use {@link Session#commit() commit} or {@link Session#rollback()
+   * rollback} to control the completion of the work.
+   * </p>
+   * <p>
+   * Therefore, the safest course of action is to create a session with the parameters
+   * <code>false</code> and {@link Session#AUTO_ACKNOWLEDGE}.
    */
-  public JmsMessageService(Connection connection, Destination destination, boolean isCommitEnabled)
-      throws JMSException {
-    JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
-    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
-    jobSession = jbpmContext.getJobSession();
+  private static Session createSession(Connection connection) throws JMSException {
+    return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+  }
 
-    this.connection = connection;
+  public void send(Job job) {
+    getJobSession().saveJob(job);
 
-    if (isCommitEnabled) {
-      session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      this.isCommitEnabled = true;
+    try {
+      sendMessage(job);
     }
-    else {
-      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      this.isCommitEnabled = false;
+    catch (JMSException e) {
+      throw new JbpmException("could not send jms message", e);
     }
-
-    messageProducer = session.createProducer(destination);
   }
 
-  public JmsMessageService(JmsMessageServiceFactory factory) throws JMSException {
-    this(factory.getConnectionFactory().createConnection(), factory.getDestination(),
-        factory.isCommitEnabled());
+  private JobSession getJobSession() {
+    return serviceFactory.getJbpmConfiguration().getCurrentJbpmContext().getJobSession();
   }
 
-  public void send(Job job) {
-    jobSession.saveJob(job);
+  private void sendMessage(Job job) throws JMSException {
+    Message message = getSession().createMessage();
+    modifyMessage(message, job);
 
+    MessageProducer messageProducer = getMessageProducer();
     try {
-      Message message = session.createMessage();
-      message.setLongProperty("jobId", job.getId());
-
-      Token token = job.getToken();
-      if (token != null) {
-        message.setLongProperty("tokenId", token.getId());
-        message.setLongProperty("processInstanceId", job.getProcessInstance().getId());
-      }
-
-      TaskInstance taskInstance = job.getTaskInstance();
-      if (taskInstance != null) {
-        message.setLongProperty("taskInstanceId", taskInstance.getId());
-      }
-
-      modifyMessage(message, job);
       messageProducer.send(message);
     }
-    catch (JMSException e) {
-      throw new JbpmException("could not send jms message", e);
+    finally {
+      messageProducer.close();
     }
   }
 
@@ -115,49 +113,21 @@
    * asynchronous continuations.
    */
   protected void modifyMessage(Message message, Job job) throws JMSException {
+    message.setLongProperty("jobId", job.getId());
+
+    if (job.isExclusive()) {
+      message.setStringProperty(GROUP_ID_PROP, GROUP_PREFIX + job.getProcessInstance().getId());
+    }
   }
 
   public void close() {
+    // there is no need to close the sessions and producers of a closed connection
     try {
-      messageProducer.close();
-    }
-    catch (JMSException e) {
-      log.warn("could not close message producer", e);
-    }
-
-    JMSException commitException = null;
-    if (isCommitEnabled) {
-      TxService txService = (TxService) Services.getCurrentService(Services.SERVICENAME_TX);
-      try {
-        if (txService.isRollbackOnly()) {
-          session.rollback();
-        }
-        else {
-          session.commit();
-        }
-      }
-      catch (JMSException e) {
-        commitException = e;
-      }
-    }
-
-    try {
-      session.close();
-    }
-    catch (JMSException e) {
-      log.warn("could not close jms session", e);
-    }
-
-    try {
       connection.close();
     }
     catch (JMSException e) {
-      log.warn("could not close jms connection", e);
+      throw new JbpmException("could not close jms connection", e);
     }
-
-    if (commitException != null) {
-      throw new JbpmException("could not commit jms session", commitException);
-    }
   }
 
   public Session getSession() {
@@ -165,6 +135,6 @@
   }
 
   protected MessageProducer getMessageProducer() throws JMSException {
-    return messageProducer;
+    return getSession().createProducer(serviceFactory.getDestination());
   }
 }

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -25,10 +25,13 @@
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import org.jbpm.JbpmConfiguration;
+import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
 import org.jbpm.ejb.impl.JobListenerBean;
 import org.jbpm.svc.Service;
 import org.jbpm.svc.ServiceFactory;
+import org.jbpm.svc.Services;
 import org.jbpm.util.JndiUtil;
 
 /**
@@ -42,7 +45,6 @@
  * <ul>
  * <li><code>connectionFactoryJndiName</code></li>
  * <li><code>destinationJndiName</code></li>
- * <li><code>isCommitEnabled</code></li>
  * </ul>
  * 
  * Refer to the jBPM manual for details.
@@ -57,11 +59,22 @@
   String connectionFactoryJndiName = "java:comp/env/jms/JbpmConnectionFactory";
   String destinationJndiName = "java:comp/env/jms/JobQueue";
 
-  boolean isCommitEnabled;
-
+  private JbpmConfiguration jbpmConfiguration;
   private ConnectionFactory connectionFactory;
   private Destination destination;
 
+  public JbpmConfiguration getJbpmConfiguration() {
+    // if this field was not injected
+    if (jbpmConfiguration == null) {
+      // set to current context
+      JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
+      if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+      assert this == jbpmContext.getServiceFactory(Services.SERVICENAME_MESSAGE) : jbpmContext.getServiceFactory(Services.SERVICENAME_MESSAGE);
+      jbpmConfiguration = jbpmContext.getJbpmConfiguration();
+    }
+    return jbpmConfiguration;
+  }
+
   public synchronized ConnectionFactory getConnectionFactory() {
     if (connectionFactory == null) {
       connectionFactory = (ConnectionFactory) JndiUtil.lookup(connectionFactoryJndiName, ConnectionFactory.class);
@@ -76,8 +89,12 @@
     return destination;
   }
 
+  /**
+   * @deprecated the EJB container manages the transactional enlistment of JMS sessions
+   * @return <code>false</code>
+   */
   public boolean isCommitEnabled() {
-    return isCommitEnabled;
+    return false;
   }
 
   public Service openService() {

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -39,7 +39,7 @@
   public Service openService() {
     try {
       Connection connection = getConnectionFactory().createConnection();
-      return new JmsMessageServiceImpl(connection, getDestination(), isCommitEnabled);
+      return new JmsMessageServiceImpl(connection, getDestination(), isCommitEnabled());
     }
     catch (JMSException e) {
       throw new JbpmException("couldn't open message session", e);

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -5,10 +5,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.hibernate.Session;
+
 import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
-import org.jbpm.db.JobSession;
 import org.jbpm.graph.exe.ProcessInstance;
 import org.jbpm.graph.exe.Token;
 import org.jbpm.job.Timer;
@@ -20,39 +19,34 @@
  */
 public class EjbSchedulerService implements SchedulerService {
 
-  private static final long serialVersionUID = 1L;
+  private static final long serialVersionUID = 2L;
 
-  private JobSession jobSession;
-  private Session session;
+  private JbpmContext jbpmContext;
   private LocalTimerService timerService;
 
   public EjbSchedulerService(LocalTimerServiceHome timerServiceHome) {
-    JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
-    if (jbpmContext == null) {
-      throw new JbpmException("instantiation of the EjbSchedulerService requires a current JbpmContext");
-    }
-    this.jobSession = jbpmContext.getJobSession();
-    this.session = jbpmContext.getSession();
+    jbpmContext = JbpmContext.getCurrentJbpmContext();
+    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
 
     try {
       timerService = timerServiceHome.create();
     }
     catch (CreateException e) {
-      throw new JbpmException("ejb local timer creation problem", e);
+      throw new JbpmException("failed to create local timer service", e);
     }
   }
 
   public void createTimer(Timer timer) {
     log.debug("creating " + timer);
-    jobSession.saveJob(timer);
-    session.flush();
+    jbpmContext.getJobSession().saveJob(timer);
+    jbpmContext.getSession().flush();
     timerService.createTimer(timer);
   }
 
   public void deleteTimer(Timer timer) {
     if (log.isDebugEnabled()) log.debug("deleting " + timer);
     timerService.cancelTimer(timer);
-    jobSession.deleteJob(timer);
+    jbpmContext.getJobSession().deleteJob(timer);
   }
 
   public void deleteTimersByName(String timerName, Token token) {
@@ -60,18 +54,17 @@
       log.debug("deleting timers by name '" + timerName + "' for " + token);
     }
     timerService.cancelTimersByName(timerName, token);
-    jobSession.deleteTimersByName(timerName, token);
+    jbpmContext.getJobSession().deleteTimersByName(timerName, token);
   }
 
   public void deleteTimersByProcessInstance(ProcessInstance processInstance) {
     if (log.isDebugEnabled()) log.debug("deleting timers for " + processInstance);
     timerService.cancelTimersForProcessInstance(processInstance);
-    jobSession.deleteJobsForProcessInstance(processInstance);
+    jbpmContext.getJobSession().deleteJobsForProcessInstance(processInstance);
   }
 
   public void close() {
     try {
-      if (log.isDebugEnabled()) log.debug("removing the timer service session bean");
       timerService.remove();
     }
     catch (RemoveException e) {

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -7,10 +7,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.hibernate.Session;
+
 import org.jbpm.JbpmContext;
 import org.jbpm.JbpmException;
-import org.jbpm.db.JobSession;
 import org.jbpm.ejb.LocalTimerEntity;
 import org.jbpm.ejb.LocalTimerEntityHome;
 import org.jbpm.graph.exe.ProcessInstance;
@@ -20,26 +19,22 @@
 
 public class EntitySchedulerService implements SchedulerService {
 
-  private static final long serialVersionUID = 1L;
+  private static final long serialVersionUID = 2L;
 
-  JobSession jobSession;
-  Session session;
-  LocalTimerEntityHome timerEntityHome;
+  private JbpmContext jbpmContext;
+  private LocalTimerEntityHome timerEntityHome;
 
   public EntitySchedulerService(LocalTimerEntityHome timerEntityHome) {
-    JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
-    if (jbpmContext == null) {
-      throw new JbpmException("entity scheduler service must be created inside a jbpm context");
-    }
-    this.jobSession = jbpmContext.getJobSession();
-    this.session = jbpmContext.getSession();
+    jbpmContext = JbpmContext.getCurrentJbpmContext();
+    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
     this.timerEntityHome = timerEntityHome;
   }
 
   public void createTimer(Timer timer) {
     if (log.isDebugEnabled()) log.debug("creating " + timer);
-    jobSession.saveJob(timer);
-    session.flush();
+    jbpmContext.getJobSession().saveJob(timer);
+    jbpmContext.getSession().flush();
+
     try {
       LocalTimerEntity timerEntity = timerEntityHome.findByPrimaryKey(new Long(timer.getId()));
       timerEntity.createTimer(timer);
@@ -58,7 +53,7 @@
     catch (FinderException e) {
       log.error("failed to retrieve entity for " + timer, e);
     }
-    jobSession.deleteJob(timer);
+    jbpmContext.getJobSession().deleteJob(timer);
   }
 
   public void deleteTimersByName(String timerName, Token token) {
@@ -76,7 +71,7 @@
     catch (FinderException e) {
       log.error("failed to retrieve timer entities by name '" + timerName + "' for " + token, e);
     }
-    jobSession.deleteTimersByName(timerName, token);
+    jbpmContext.getJobSession().deleteTimersByName(timerName, token);
   }
 
   public void deleteTimersByProcessInstance(ProcessInstance processInstance) {
@@ -93,11 +88,11 @@
     catch (FinderException e) {
       log.error("failed to retrieve timer entities for " + processInstance, e);
     }
-    jobSession.deleteJobsForProcessInstance(processInstance);
+    jbpmContext.getJobSession().deleteJobsForProcessInstance(processInstance);
   }
 
   public void close() {
-    timerEntityHome = null;
+    // nothing to do here
   }
 
   private static final Log log = LogFactory.getLog(EntitySchedulerService.class);

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml	2010-11-23 13:45:16 UTC (rev 6838)
@@ -2,35 +2,31 @@
 
   <jbpm-context>
     <service name="persistence" factory="org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory" />
-    	<service name="message">
-		<factory>
-			<ref bean="jbpm.jms.connector.command.service" />
-		</factory>
-		</service>
-		<service name="scheduler">
-		<factory>
-			<ref bean="jbpm.jms.connector.job.service" />
-		</factory>
-	</service>
+    <service name="message">
+      <factory>
+        <ref bean="jbpm.jms.connector.job.service" />
+      </factory>
+    </service>
+    <service name="scheduler">
+      <factory>
+        <ref bean="jbpm.jms.connector.job.service" />
+      </factory>
+    </service>
     <service name="tx" factory="org.jbpm.tx.TxServiceFactory" />
     <service name="logging" factory="org.jbpm.logging.db.DbLoggingServiceFactory" />
     <service name="authentication" factory="org.jbpm.security.authentication.DefaultAuthenticationServiceFactory" />
   </jbpm-context>
 
-  <bean name="jbpm.jms.connector.command.service" class="org.jbpm.jms.JmsConnectorServiceFactory">
-  	<field name="jbpmConfiguration">
-      <ref bean="jbpm.configuration" />
-    </field>
-	<property name="connectionFactoryJndiName"><string value="java:JmsXA"/></property>
-	<property name="destinationJndiName"><string value="queue/JbpmJobQueue"/></property>
-  </bean>
-  
   <bean name="jbpm.jms.connector.job.service" class="org.jbpm.jms.JmsConnectorServiceFactory">
-  	<field name="jbpmConfiguration">
+    <field name="jbpmConfiguration">
       <ref bean="jbpm.configuration" />
     </field>
-	<property name="connectionFactoryJndiName"><string value="java:JmsXA"/></property>
-	<property name="destinationJndiName"><string value="queue/JbpmJobQueue"/></property>
+    <property name="connectionFactoryJndiName">
+      <string value="java:JmsXA" />
+    </property>
+    <property name="destinationJndiName">
+      <string value="queue/JbpmJobQueue" />
+    </property>
   </bean>
 
   <!-- employ the context class loader -->

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -92,7 +92,7 @@
       Timer timer = jbpmContext.getJobSession().loadTimer(job.getId());
       JmsConnectorService schedulerService = (JmsConnectorService) jbpmContext.getServices()
         .getSchedulerService();
-      schedulerService.sendWithoutSaving(timer);
+      schedulerService.sendMessage(timer);
     }
   }
 

Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java	2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java	2010-11-23 13:45:16 UTC (rev 6838)
@@ -21,9 +21,6 @@
  */
 package org.jbpm.jms;
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -32,7 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jbpm.JbpmContext;
+
 import org.jbpm.JbpmException;
 import org.jbpm.db.JobSession;
 import org.jbpm.graph.exe.ProcessInstance;
@@ -43,56 +40,48 @@
 import org.jbpm.scheduler.SchedulerService;
 
 public class JmsConnectorService implements MessageService, SchedulerService {
-  
-  private final JobSession jobSession;
-  private final JmsConnectorServiceFactory factory;
-  private static final String JMS_PROP_SCHEDULED = "JMS_JBOSS_SCHEDULED_DELIVERY";
-  private static final String JMS_PROP_GROUP = "JMSXGroupID";
-  private static final String JBPM_GROUP_PREFIX = "jBPMPID";
-  
-  private static final DateFormat sdf = new SimpleDateFormat("yyyy.MM.dd G 'at' HH:mm:ss z");
-  private static final long serialVersionUID = 1L;
 
+  private final JmsConnectorServiceFactory serviceFactory;
+  private final Connection connection;
+
+  private static final String SCHEDULED_DELIVERY_PROP = "JMS_JBOSS_SCHEDULED_DELIVERY";
+  private static final String GROUP_ID_PROP = "JMSXGroupID";
+  private static final String GROUP_PREFIX = "jBPMPID";
+  private static final long serialVersionUID = 2L;
+
   private static final Log log = LogFactory.getLog(JmsConnectorService.class);
-  
-  public JmsConnectorService(JmsConnectorServiceFactory factory) throws JMSException {
-    JbpmContext jbpmContext = factory.getJbpmConfiguration().getCurrentJbpmContext();
-    if (jbpmContext == null) throw new JbpmException("no active jbpm context");
-    jobSession = jbpmContext.getJobSession();
-    
-    this.factory = factory;
+
+  JmsConnectorService(JmsConnectorServiceFactory serviceFactory) throws JMSException {
+    connection = serviceFactory.getConnectionFactory().createConnection();
+    this.serviceFactory = serviceFactory;
   }
 
+  private JobSession getJobSession() {
+    return serviceFactory.getJbpmConfiguration().getCurrentJbpmContext().getJobSession();
+  }
+
   public void send(Job job) {
-    jobSession.saveJob(job);
+    getJobSession().saveJob(job);
     try {
-      sendWithoutSaving(job);
+      sendMessage(job);
     }
     catch (JMSException e) {
       throw new JbpmException("failed to send job message", e);
     }
   }
 
-  void sendWithoutSaving(Job job) throws JMSException {
-	  Connection connection = factory.getConnectionFactory().createConnection();
+  final void sendMessage(Job job) throws JMSException {
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     try {
-      Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
       Message message = session.createMessage();
       populateMessage(message, job);
 
-      MessageProducer messageProducer = session.createProducer(factory.getDestination());
+      MessageProducer messageProducer = session.createProducer(serviceFactory.getDestination());
       messageProducer.send(message);
     }
     finally {
-      // there is no need to close the sessions and producers of a closed connection
-      // http://download.oracle.com/javaee/1.4/api/javax/jms/Connection.html#close()
-      try {
-        connection.close();
-      }
-      catch (JMSException e) {
-        log.warn("failed to close jms connection", e);
-      }
+      // there is no need to close the producers of a closed session
+      session.close();
     }
   }
 
@@ -101,21 +90,17 @@
 
     if (job instanceof Timer) {
       Timer timer = (Timer) job;
-      
-      if(log.isDebugEnabled()) {
-    	log.debug("Creating timer for job: "+job.getId()+" to execute at: "+(timer.getDueDate() == null ? "no set date" : sdf.format(timer.getDueDate())));
+
+      if (log.isDebugEnabled()) {
+        log.debug("scheduling " + timer + " to execute at " + timer.getDueDate());
       }
-      message.setLongProperty(JMS_PROP_SCHEDULED, timer.getDueDate().getTime());
-      //higher priority for timers.
+      message.setLongProperty(SCHEDULED_DELIVERY_PROP, timer.getDueDate().getTime());
+      // raise timer priority
       message.setJMSPriority(9);
     }
-    
-    if(!(job instanceof Timer))
-    {
-    	if(job.isExclusive())
-    	{
-    		message.setStringProperty(JMS_PROP_GROUP, JBPM_GROUP_PREFIX+job.getProcessInstance().getId());
-    	}
+
+    if (job.isExclusive()) {
+      message.setStringProperty(GROUP_ID_PROP, GROUP_PREFIX + job.getProcessInstance().getId());
     }
   }
 
@@ -124,18 +109,23 @@
   }
 
   public void deleteTimer(Timer timer) {
-    jobSession.deleteJob(timer);
+    getJobSession().deleteJob(timer);
   }
 
   public void deleteTimersByName(String timerName, Token token) {
-    jobSession.deleteTimersByName(timerName, token);
+    getJobSession().deleteTimersByName(timerName, token);
   }
 
   public void deleteTimersByProcessInstance(ProcessInstance processInstance) {
-    jobSession.deleteJobsForProcessInstance(processInstance);
+    getJobSession().deleteJobsForProcessInstance(processInstance);
   }
 
   public void close() {
-	//nothing to do... 
+    try {
+      connection.close();
+    }
+    catch (JMSException e) {
+      throw new JbpmException("failed to close jms connection", e);
+    }
   }
 }



More information about the jbpm-commits mailing list