[jbpm-commits] JBoss JBPM SVN: r6838 - in jbpm3/branches/jbpm-3.2-soa: enterprise/src/main/java/org/jbpm/scheduler/ejbtimer and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Nov 23 08:45:17 EST 2010
Author: alex.guizar at jboss.com
Date: 2010-11-23 08:45:16 -0500 (Tue, 23 Nov 2010)
New Revision: 6838
Modified:
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java
jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java
jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java
jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java
Log:
reference as few objects as possible in jms message/connector and ejb/entity scheduler services
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -28,84 +28,82 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.db.JobSession;
-import org.jbpm.graph.exe.Token;
import org.jbpm.job.Job;
import org.jbpm.msg.MessageService;
import org.jbpm.svc.Services;
-import org.jbpm.taskmgmt.exe.TaskInstance;
-import org.jbpm.tx.TxService;
public class JmsMessageService implements MessageService {
- private static final long serialVersionUID = 1L;
+ private final JmsMessageServiceFactory serviceFactory;
- private static final Log log = LogFactory.getLog(JmsMessageService.class);
+ private final Connection connection;
+ private final Session session;
- final JobSession jobSession;
+ private static final long serialVersionUID = 2L;
+ private static final String GROUP_ID_PROP = "JMSXGroupID";
+ private static final String GROUP_PREFIX = "jBPMPID";
- final Connection connection;
- final Session session;
- final MessageProducer messageProducer;
+ /** @deprecated use {@link #JmsMessageService(JmsMessageServiceFactory)} instead */
+ public JmsMessageService(Connection connection, Destination destination, boolean commitEnabled)
+ throws JMSException {
+ this.connection = connection;
+ session = createSession(connection);
+ serviceFactory = (JmsMessageServiceFactory) Services.getCurrentService(Services.SERVICENAME_MESSAGE);
+ }
- final boolean isCommitEnabled;
+ public JmsMessageService(JmsMessageServiceFactory serviceFactory) throws JMSException {
+ connection = serviceFactory.getConnectionFactory().createConnection();
+ session = createSession(connection);
+ this.serviceFactory = serviceFactory;
+ }
/**
- * @deprecated use {@link #JmsMessageService(JmsMessageServiceFactory)} instead
+ * EJB 2.1 section 17.3.5 Because the container manages the transactional enlistment of JMS
+ * sessions on behalf of a bean, the parameters of the
+ * {@link Connection#createSession(boolean, int) createSession} method are ignored. It is
+ * recommended that the Bean Provider specify that a session is transacted, but provide
+ * <code>0</code> for the value of the acknowledgment mode.
+ * <p>
+ * Nonetheless, in <a href="http://publib.boulder.ibm.com/infocenter/adiehelp/v5r1m1/topic/com.ibm.wasee.doc/info/ee/ae/tmj_ep.html"
+ * >WebSphere</a>, if the transacted flag is set to true outside of a transaction, the
+ * application should use {@link Session#commit() commit} or {@link Session#rollback()
+ * rollback} to control the completion of the work.
+ * </p>
+ * <p>
+ * Therefore, the safest course of action is to create a session with the parameters
+ * <code>false</code> and {@link Session#AUTO_ACKNOWLEDGE}.
*/
- public JmsMessageService(Connection connection, Destination destination, boolean isCommitEnabled)
- throws JMSException {
- JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
- if (jbpmContext == null) throw new JbpmException("no active jbpm context");
- jobSession = jbpmContext.getJobSession();
+ private static Session createSession(Connection connection) throws JMSException {
+ return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
- this.connection = connection;
+ public void send(Job job) {
+ getJobSession().saveJob(job);
- if (isCommitEnabled) {
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
- this.isCommitEnabled = true;
+ try {
+ sendMessage(job);
}
- else {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- this.isCommitEnabled = false;
+ catch (JMSException e) {
+ throw new JbpmException("could not send jms message", e);
}
-
- messageProducer = session.createProducer(destination);
}
- public JmsMessageService(JmsMessageServiceFactory factory) throws JMSException {
- this(factory.getConnectionFactory().createConnection(), factory.getDestination(),
- factory.isCommitEnabled());
+ private JobSession getJobSession() {
+ return serviceFactory.getJbpmConfiguration().getCurrentJbpmContext().getJobSession();
}
- public void send(Job job) {
- jobSession.saveJob(job);
+ private void sendMessage(Job job) throws JMSException {
+ Message message = getSession().createMessage();
+ modifyMessage(message, job);
+ MessageProducer messageProducer = getMessageProducer();
try {
- Message message = session.createMessage();
- message.setLongProperty("jobId", job.getId());
-
- Token token = job.getToken();
- if (token != null) {
- message.setLongProperty("tokenId", token.getId());
- message.setLongProperty("processInstanceId", job.getProcessInstance().getId());
- }
-
- TaskInstance taskInstance = job.getTaskInstance();
- if (taskInstance != null) {
- message.setLongProperty("taskInstanceId", taskInstance.getId());
- }
-
- modifyMessage(message, job);
messageProducer.send(message);
}
- catch (JMSException e) {
- throw new JbpmException("could not send jms message", e);
+ finally {
+ messageProducer.close();
}
}
@@ -115,49 +113,21 @@
* asynchronous continuations.
*/
protected void modifyMessage(Message message, Job job) throws JMSException {
+ message.setLongProperty("jobId", job.getId());
+
+ if (job.isExclusive()) {
+ message.setStringProperty(GROUP_ID_PROP, GROUP_PREFIX + job.getProcessInstance().getId());
+ }
}
public void close() {
+ // there is no need to close the sessions and producers of a closed connection
try {
- messageProducer.close();
- }
- catch (JMSException e) {
- log.warn("could not close message producer", e);
- }
-
- JMSException commitException = null;
- if (isCommitEnabled) {
- TxService txService = (TxService) Services.getCurrentService(Services.SERVICENAME_TX);
- try {
- if (txService.isRollbackOnly()) {
- session.rollback();
- }
- else {
- session.commit();
- }
- }
- catch (JMSException e) {
- commitException = e;
- }
- }
-
- try {
- session.close();
- }
- catch (JMSException e) {
- log.warn("could not close jms session", e);
- }
-
- try {
connection.close();
}
catch (JMSException e) {
- log.warn("could not close jms connection", e);
+ throw new JbpmException("could not close jms connection", e);
}
-
- if (commitException != null) {
- throw new JbpmException("could not commit jms session", commitException);
- }
}
public Session getSession() {
@@ -165,6 +135,6 @@
}
protected MessageProducer getMessageProducer() throws JMSException {
- return messageProducer;
+ return getSession().createProducer(serviceFactory.getDestination());
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -25,10 +25,13 @@
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.jbpm.JbpmConfiguration;
+import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.ejb.impl.JobListenerBean;
import org.jbpm.svc.Service;
import org.jbpm.svc.ServiceFactory;
+import org.jbpm.svc.Services;
import org.jbpm.util.JndiUtil;
/**
@@ -42,7 +45,6 @@
* <ul>
* <li><code>connectionFactoryJndiName</code></li>
* <li><code>destinationJndiName</code></li>
- * <li><code>isCommitEnabled</code></li>
* </ul>
*
* Refer to the jBPM manual for details.
@@ -57,11 +59,22 @@
String connectionFactoryJndiName = "java:comp/env/jms/JbpmConnectionFactory";
String destinationJndiName = "java:comp/env/jms/JobQueue";
- boolean isCommitEnabled;
-
+ private JbpmConfiguration jbpmConfiguration;
private ConnectionFactory connectionFactory;
private Destination destination;
+ public JbpmConfiguration getJbpmConfiguration() {
+ // if this field was not injected
+ if (jbpmConfiguration == null) {
+ // set to current context
+ JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
+ if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+ assert this == jbpmContext.getServiceFactory(Services.SERVICENAME_MESSAGE) : jbpmContext.getServiceFactory(Services.SERVICENAME_MESSAGE);
+ jbpmConfiguration = jbpmContext.getJbpmConfiguration();
+ }
+ return jbpmConfiguration;
+ }
+
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = (ConnectionFactory) JndiUtil.lookup(connectionFactoryJndiName, ConnectionFactory.class);
@@ -76,8 +89,12 @@
return destination;
}
+ /**
+ * @deprecated the EJB container manages the transactional enlistment of JMS sessions
+ * @return <code>false</code>
+ */
public boolean isCommitEnabled() {
- return isCommitEnabled;
+ return false;
}
public Service openService() {
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -39,7 +39,7 @@
public Service openService() {
try {
Connection connection = getConnectionFactory().createConnection();
- return new JmsMessageServiceImpl(connection, getDestination(), isCommitEnabled);
+ return new JmsMessageServiceImpl(connection, getDestination(), isCommitEnabled());
}
catch (JMSException e) {
throw new JbpmException("couldn't open message session", e);
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EjbSchedulerService.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -5,10 +5,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.hibernate.Session;
+
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
-import org.jbpm.db.JobSession;
import org.jbpm.graph.exe.ProcessInstance;
import org.jbpm.graph.exe.Token;
import org.jbpm.job.Timer;
@@ -20,39 +19,34 @@
*/
public class EjbSchedulerService implements SchedulerService {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- private JobSession jobSession;
- private Session session;
+ private JbpmContext jbpmContext;
private LocalTimerService timerService;
public EjbSchedulerService(LocalTimerServiceHome timerServiceHome) {
- JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
- if (jbpmContext == null) {
- throw new JbpmException("instantiation of the EjbSchedulerService requires a current JbpmContext");
- }
- this.jobSession = jbpmContext.getJobSession();
- this.session = jbpmContext.getSession();
+ jbpmContext = JbpmContext.getCurrentJbpmContext();
+ if (jbpmContext == null) throw new JbpmException("no active jbpm context");
try {
timerService = timerServiceHome.create();
}
catch (CreateException e) {
- throw new JbpmException("ejb local timer creation problem", e);
+ throw new JbpmException("failed to create local timer service", e);
}
}
public void createTimer(Timer timer) {
log.debug("creating " + timer);
- jobSession.saveJob(timer);
- session.flush();
+ jbpmContext.getJobSession().saveJob(timer);
+ jbpmContext.getSession().flush();
timerService.createTimer(timer);
}
public void deleteTimer(Timer timer) {
if (log.isDebugEnabled()) log.debug("deleting " + timer);
timerService.cancelTimer(timer);
- jobSession.deleteJob(timer);
+ jbpmContext.getJobSession().deleteJob(timer);
}
public void deleteTimersByName(String timerName, Token token) {
@@ -60,18 +54,17 @@
log.debug("deleting timers by name '" + timerName + "' for " + token);
}
timerService.cancelTimersByName(timerName, token);
- jobSession.deleteTimersByName(timerName, token);
+ jbpmContext.getJobSession().deleteTimersByName(timerName, token);
}
public void deleteTimersByProcessInstance(ProcessInstance processInstance) {
if (log.isDebugEnabled()) log.debug("deleting timers for " + processInstance);
timerService.cancelTimersForProcessInstance(processInstance);
- jobSession.deleteJobsForProcessInstance(processInstance);
+ jbpmContext.getJobSession().deleteJobsForProcessInstance(processInstance);
}
public void close() {
try {
- if (log.isDebugEnabled()) log.debug("removing the timer service session bean");
timerService.remove();
}
catch (RemoveException e) {
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise/src/main/java/org/jbpm/scheduler/ejbtimer/EntitySchedulerService.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -7,10 +7,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.hibernate.Session;
+
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
-import org.jbpm.db.JobSession;
import org.jbpm.ejb.LocalTimerEntity;
import org.jbpm.ejb.LocalTimerEntityHome;
import org.jbpm.graph.exe.ProcessInstance;
@@ -20,26 +19,22 @@
public class EntitySchedulerService implements SchedulerService {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- JobSession jobSession;
- Session session;
- LocalTimerEntityHome timerEntityHome;
+ private JbpmContext jbpmContext;
+ private LocalTimerEntityHome timerEntityHome;
public EntitySchedulerService(LocalTimerEntityHome timerEntityHome) {
- JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
- if (jbpmContext == null) {
- throw new JbpmException("entity scheduler service must be created inside a jbpm context");
- }
- this.jobSession = jbpmContext.getJobSession();
- this.session = jbpmContext.getSession();
+ jbpmContext = JbpmContext.getCurrentJbpmContext();
+ if (jbpmContext == null) throw new JbpmException("no active jbpm context");
this.timerEntityHome = timerEntityHome;
}
public void createTimer(Timer timer) {
if (log.isDebugEnabled()) log.debug("creating " + timer);
- jobSession.saveJob(timer);
- session.flush();
+ jbpmContext.getJobSession().saveJob(timer);
+ jbpmContext.getSession().flush();
+
try {
LocalTimerEntity timerEntity = timerEntityHome.findByPrimaryKey(new Long(timer.getId()));
timerEntity.createTimer(timer);
@@ -58,7 +53,7 @@
catch (FinderException e) {
log.error("failed to retrieve entity for " + timer, e);
}
- jobSession.deleteJob(timer);
+ jbpmContext.getJobSession().deleteJob(timer);
}
public void deleteTimersByName(String timerName, Token token) {
@@ -76,7 +71,7 @@
catch (FinderException e) {
log.error("failed to retrieve timer entities by name '" + timerName + "' for " + token, e);
}
- jobSession.deleteTimersByName(timerName, token);
+ jbpmContext.getJobSession().deleteTimersByName(timerName, token);
}
public void deleteTimersByProcessInstance(ProcessInstance processInstance) {
@@ -93,11 +88,11 @@
catch (FinderException e) {
log.error("failed to retrieve timer entities for " + processInstance, e);
}
- jobSession.deleteJobsForProcessInstance(processInstance);
+ jbpmContext.getJobSession().deleteJobsForProcessInstance(processInstance);
}
public void close() {
- timerEntityHome = null;
+ // nothing to do here
}
private static final Log log = LogFactory.getLog(EntitySchedulerService.class);
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/etc/jbpm.cfg.xml 2010-11-23 13:45:16 UTC (rev 6838)
@@ -2,35 +2,31 @@
<jbpm-context>
<service name="persistence" factory="org.jbpm.persistence.jta.JtaDbPersistenceServiceFactory" />
- <service name="message">
- <factory>
- <ref bean="jbpm.jms.connector.command.service" />
- </factory>
- </service>
- <service name="scheduler">
- <factory>
- <ref bean="jbpm.jms.connector.job.service" />
- </factory>
- </service>
+ <service name="message">
+ <factory>
+ <ref bean="jbpm.jms.connector.job.service" />
+ </factory>
+ </service>
+ <service name="scheduler">
+ <factory>
+ <ref bean="jbpm.jms.connector.job.service" />
+ </factory>
+ </service>
<service name="tx" factory="org.jbpm.tx.TxServiceFactory" />
<service name="logging" factory="org.jbpm.logging.db.DbLoggingServiceFactory" />
<service name="authentication" factory="org.jbpm.security.authentication.DefaultAuthenticationServiceFactory" />
</jbpm-context>
- <bean name="jbpm.jms.connector.command.service" class="org.jbpm.jms.JmsConnectorServiceFactory">
- <field name="jbpmConfiguration">
- <ref bean="jbpm.configuration" />
- </field>
- <property name="connectionFactoryJndiName"><string value="java:JmsXA"/></property>
- <property name="destinationJndiName"><string value="queue/JbpmJobQueue"/></property>
- </bean>
-
<bean name="jbpm.jms.connector.job.service" class="org.jbpm.jms.JmsConnectorServiceFactory">
- <field name="jbpmConfiguration">
+ <field name="jbpmConfiguration">
<ref bean="jbpm.configuration" />
</field>
- <property name="connectionFactoryJndiName"><string value="java:JmsXA"/></property>
- <property name="destinationJndiName"><string value="queue/JbpmJobQueue"/></property>
+ <property name="connectionFactoryJndiName">
+ <string value="java:JmsXA" />
+ </property>
+ <property name="destinationJndiName">
+ <string value="queue/JbpmJobQueue" />
+ </property>
</bean>
<!-- employ the context class loader -->
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/ExecuteJobCommand.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -92,7 +92,7 @@
Timer timer = jbpmContext.getJobSession().loadTimer(job.getId());
JmsConnectorService schedulerService = (JmsConnectorService) jbpmContext.getServices()
.getSchedulerService();
- schedulerService.sendWithoutSaving(timer);
+ schedulerService.sendMessage(timer);
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java 2010-11-21 13:43:03 UTC (rev 6837)
+++ jbpm3/branches/jbpm-3.2-soa/enterprise-jee5/src/main/java/org/jbpm/jms/JmsConnectorService.java 2010-11-23 13:45:16 UTC (rev 6838)
@@ -21,9 +21,6 @@
*/
package org.jbpm.jms;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -32,7 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jbpm.JbpmContext;
+
import org.jbpm.JbpmException;
import org.jbpm.db.JobSession;
import org.jbpm.graph.exe.ProcessInstance;
@@ -43,56 +40,48 @@
import org.jbpm.scheduler.SchedulerService;
public class JmsConnectorService implements MessageService, SchedulerService {
-
- private final JobSession jobSession;
- private final JmsConnectorServiceFactory factory;
- private static final String JMS_PROP_SCHEDULED = "JMS_JBOSS_SCHEDULED_DELIVERY";
- private static final String JMS_PROP_GROUP = "JMSXGroupID";
- private static final String JBPM_GROUP_PREFIX = "jBPMPID";
-
- private static final DateFormat sdf = new SimpleDateFormat("yyyy.MM.dd G 'at' HH:mm:ss z");
- private static final long serialVersionUID = 1L;
+ private final JmsConnectorServiceFactory serviceFactory;
+ private final Connection connection;
+
+ private static final String SCHEDULED_DELIVERY_PROP = "JMS_JBOSS_SCHEDULED_DELIVERY";
+ private static final String GROUP_ID_PROP = "JMSXGroupID";
+ private static final String GROUP_PREFIX = "jBPMPID";
+ private static final long serialVersionUID = 2L;
+
private static final Log log = LogFactory.getLog(JmsConnectorService.class);
-
- public JmsConnectorService(JmsConnectorServiceFactory factory) throws JMSException {
- JbpmContext jbpmContext = factory.getJbpmConfiguration().getCurrentJbpmContext();
- if (jbpmContext == null) throw new JbpmException("no active jbpm context");
- jobSession = jbpmContext.getJobSession();
-
- this.factory = factory;
+
+ JmsConnectorService(JmsConnectorServiceFactory serviceFactory) throws JMSException {
+ connection = serviceFactory.getConnectionFactory().createConnection();
+ this.serviceFactory = serviceFactory;
}
+ private JobSession getJobSession() {
+ return serviceFactory.getJbpmConfiguration().getCurrentJbpmContext().getJobSession();
+ }
+
public void send(Job job) {
- jobSession.saveJob(job);
+ getJobSession().saveJob(job);
try {
- sendWithoutSaving(job);
+ sendMessage(job);
}
catch (JMSException e) {
throw new JbpmException("failed to send job message", e);
}
}
- void sendWithoutSaving(Job job) throws JMSException {
- Connection connection = factory.getConnectionFactory().createConnection();
+ final void sendMessage(Job job) throws JMSException {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Message message = session.createMessage();
populateMessage(message, job);
- MessageProducer messageProducer = session.createProducer(factory.getDestination());
+ MessageProducer messageProducer = session.createProducer(serviceFactory.getDestination());
messageProducer.send(message);
}
finally {
- // there is no need to close the sessions and producers of a closed connection
- // http://download.oracle.com/javaee/1.4/api/javax/jms/Connection.html#close()
- try {
- connection.close();
- }
- catch (JMSException e) {
- log.warn("failed to close jms connection", e);
- }
+ // there is no need to close the producers of a closed session
+ session.close();
}
}
@@ -101,21 +90,17 @@
if (job instanceof Timer) {
Timer timer = (Timer) job;
-
- if(log.isDebugEnabled()) {
- log.debug("Creating timer for job: "+job.getId()+" to execute at: "+(timer.getDueDate() == null ? "no set date" : sdf.format(timer.getDueDate())));
+
+ if (log.isDebugEnabled()) {
+ log.debug("scheduling " + timer + " to execute at " + timer.getDueDate());
}
- message.setLongProperty(JMS_PROP_SCHEDULED, timer.getDueDate().getTime());
- //higher priority for timers.
+ message.setLongProperty(SCHEDULED_DELIVERY_PROP, timer.getDueDate().getTime());
+ // raise timer priority
message.setJMSPriority(9);
}
-
- if(!(job instanceof Timer))
- {
- if(job.isExclusive())
- {
- message.setStringProperty(JMS_PROP_GROUP, JBPM_GROUP_PREFIX+job.getProcessInstance().getId());
- }
+
+ if (job.isExclusive()) {
+ message.setStringProperty(GROUP_ID_PROP, GROUP_PREFIX + job.getProcessInstance().getId());
}
}
@@ -124,18 +109,23 @@
}
public void deleteTimer(Timer timer) {
- jobSession.deleteJob(timer);
+ getJobSession().deleteJob(timer);
}
public void deleteTimersByName(String timerName, Token token) {
- jobSession.deleteTimersByName(timerName, token);
+ getJobSession().deleteTimersByName(timerName, token);
}
public void deleteTimersByProcessInstance(ProcessInstance processInstance) {
- jobSession.deleteJobsForProcessInstance(processInstance);
+ getJobSession().deleteJobsForProcessInstance(processInstance);
}
public void close() {
- //nothing to do...
+ try {
+ connection.close();
+ }
+ catch (JMSException e) {
+ throw new JbpmException("failed to close jms connection", e);
+ }
}
}
More information about the jbpm-commits
mailing list