[jbpm-commits] JBoss JBPM SVN: r5441 - in jbpm3/branches/jbpm-3.2-soa/modules: core/src/main/java/org/jbpm/persistence/db and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 7 04:54:14 EDT 2009
Author: alex.guizar at jboss.com
Date: 2009-08-07 04:54:13 -0400 (Fri, 07 Aug 2009)
New Revision: 5441
Modified:
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java
jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml
Log:
[JBPM-2472] Make JMS session creation configurable
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/msg/db/DbMessageService.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -31,38 +31,34 @@
import org.jbpm.msg.MessageService;
public class DbMessageService implements MessageService {
-
+
private static final long serialVersionUID = 1L;
- JobSession jobSession = null;
- JobExecutor jobExecutor = null;
- boolean hasProducedJobs = false;
-
+ final JobSession jobSession;
+ final JobExecutor jobExecutor;
+ boolean hasProducedJobs;
+
public DbMessageService() {
JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
- if (jbpmContext==null) {
- throw new JbpmException("instantiation of the DbMessageService requires a current JbpmContext");
- }
+ if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+
jobSession = jbpmContext.getJobSession();
jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
}
public void send(Job job) {
jobSession.saveJob(job);
- log.debug("saved "+job);
hasProducedJobs = true;
}
public void close() {
- if ( (hasProducedJobs)
- && (jobExecutor!=null)
- ) {
- log.debug("messages were produced, job executor will be signalled");
- synchronized(jobExecutor) {
+ if (hasProducedJobs && jobExecutor != null) {
+ log.debug("messages were produced, job executor will be notified");
+ synchronized (jobExecutor) {
jobExecutor.notify();
}
}
}
-
+
private static Log log = LogFactory.getLog(DbMessageService.class);
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/db/DbPersistenceService.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -58,7 +58,7 @@
protected boolean mustConnectionBeClosed;
protected Transaction transaction;
- protected boolean isTransactionEnabled = true;
+ protected boolean isTransactionEnabled;
protected boolean isCurrentSessionEnabled;
protected Session session;
@@ -156,10 +156,11 @@
public Connection getConnection(boolean resolveSession) {
if (connection == null) {
- if (persistenceServiceFactory.getDataSource() != null) {
+ DataSource dataSource = persistenceServiceFactory.getDataSource();
+ if (dataSource != null) {
try {
- log.debug("establishing connneciton to data source");
- connection = persistenceServiceFactory.getDataSource().getConnection();
+ log.debug("establishing connection to data source");
+ connection = dataSource.getConnection();
mustConnectionBeClosed = true;
}
catch (SQLException e) {
@@ -257,7 +258,7 @@
session.flush();
}
catch (HibernateException e) {
- // avoid log and throw antipatternf
+ // avoid log and throw antipattern
return e;
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/persistence/jta/JtaDbPersistenceServiceFactory.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -30,14 +30,13 @@
import org.jbpm.svc.Service;
/**
- * The JTA persistence service enables jBPM to participate in JTA transactions.
- * If an existing transaction is underway, {@link JtaDbPersistenceService}
- * clings to it; otherwise it starts a new transaction.
+ * The JTA persistence service enables jBPM to participate in JTA transactions. If an existing
+ * transaction is underway, {@link JtaDbPersistenceService} clings to it; otherwise it starts a new
+ * transaction.
*
* <h3>Configuration</h3>
*
- * The JTA persistence service factory has the configurable fields described
- * below.
+ * The JTA persistence service factory has the configurable fields described below.
*
* <ul>
* <li><code>isCurrentSessionEnabled</code></li>
@@ -47,46 +46,44 @@
* Refer to the jBPM manual for details.
*
* @author Tom Baeyens
+ * @author Alejandro Guizar
*/
-public class JtaDbPersistenceServiceFactory extends DbPersistenceServiceFactory
-{
+public class JtaDbPersistenceServiceFactory extends DbPersistenceServiceFactory {
private static final long serialVersionUID = 1L;
private UserTransaction userTransaction;
- public JtaDbPersistenceServiceFactory()
- {
+ public JtaDbPersistenceServiceFactory() {
setCurrentSessionEnabled(true);
setTransactionEnabled(false);
}
- public Service openService()
- {
+ public Service openService() {
return new JtaDbPersistenceService(this);
}
- public UserTransaction getUserTransaction()
- {
- if (userTransaction == null)
- {
+ public UserTransaction getUserTransaction() {
+ if (userTransaction == null) {
String jndiName = getConfiguration().getProperty("jta.UserTransaction");
- if (jndiName == null)
- {
+ if (jndiName == null) {
/*
- * EJB 2.1 section 20.9 The container must make the UserTransaction interface available to the enterprise beans that are allowed to use this interface (only
- * session and message- driven beans with bean-managed transaction demarcation are allowed to use this interface) in JNDI under the name
- * java:comp/UserTransaction. J2EE 1.4 section 4.2.1.1 The J2EE platform must provide an object implementing the UserTransaction interface to all web
- * components. The platform must publish the UserTransaction object in JNDI under the name java:comp/UserTransaction.
+ * EJB 2.1 section 20.9 The container must make the UserTransaction interface available to
+ * the enterprise beans that are allowed to use this interface (only session and message-
+ * driven beans with bean-managed transaction demarcation are allowed to use this interface)
+ * in JNDI under the name java:comp/UserTransaction.
*/
+ /*
+ * J2EE 1.4 section 4.2.1.1 The J2EE platform must provide an object implementing the
+ * UserTransaction interface to all web components. The platform must publish the
+ * UserTransaction object in JNDI under the name java:comp/UserTransaction.
+ */
jndiName = "java:comp/UserTransaction";
}
- try
- {
- userTransaction = (UserTransaction)new InitialContext().lookup(jndiName);
+ try {
+ userTransaction = (UserTransaction) new InitialContext().lookup(jndiName);
}
- catch (NamingException e)
- {
+ catch (NamingException e) {
throw new JbpmException("could not retrieve user transaction with name " + jndiName, e);
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/core/src/main/java/org/jbpm/scheduler/db/DbSchedulerService.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -37,20 +37,19 @@
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(DbSchedulerService.class);
-
- JobSession jobSession = null;
- JobExecutor jobExecutor = null;
- boolean hasProducedJobs = false;
+ final JobSession jobSession;
+ final JobExecutor jobExecutor;
+ boolean hasProducedJobs;
+
public DbSchedulerService() {
JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
- if (jbpmContext==null) {
- throw new JbpmException("instantiation of the DbSchedulerService requires a current JbpmContext");
- }
+ if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+
this.jobSession = jbpmContext.getJobSession();
this.jobExecutor = jbpmContext.getJbpmConfiguration().getJobExecutor();
}
-
+
public void createTimer(Timer timerJob) {
jobSession.saveJob(timerJob);
hasProducedJobs = true;
@@ -70,7 +69,7 @@
public void close() {
if (hasProducedJobs && jobExecutor != null) {
- log.debug("timers were produced, job executor will be signalled");
+ log.debug("timers were produced, job executor will be notified");
synchronized (jobExecutor) {
jobExecutor.notify();
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/ejb/impl/CommandListenerBean.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -46,18 +46,19 @@
import org.jbpm.ejb.LocalCommandServiceHome;
/**
- * This message-driven bean listens for {@link ObjectMessage object messages}
- * containing a command instance. The received commands are
- * executed by the {@link CommandServiceBean command service} bean, using the
- * local interface.
+ * This message-driven bean listens for {@link ObjectMessage object messages} containing a command
+ * instance. The received commands are executed by the {@link CommandServiceBean command service}
+ * bean, using the local interface.
*
- * The body of the message must be a Java object that implements the {@link
- * Command} interface. The message properties, if any, are ignored.
+ * The body of the message must be a Java object that implements the {@link Command} interface. The
+ * message properties, if any, are ignored.
*
* <h3>Environment</h3>
*
- * <p>The environment entries and resources available for customization are
- * summarized in the table below.</p>
+ * <p>
+ * The environment entries and resources available for customization are summarized in the table
+ * below.
+ * </p>
*
* <table border="1">
* <tr>
@@ -68,50 +69,41 @@
* <tr>
* <td><code>ejb/LocalCommandServiceBean</code></td>
* <td>EJB Reference</td>
- * <td>Link to the local {@linkplain CommandServiceBean session bean} that
- * executes commands on a separate jBPM context.
- * </td>
+ * <td>Link to the local {@linkplain CommandServiceBean session bean} that executes commands on a
+ * separate jBPM context.</td>
* </tr>
* <tr>
* <td><code>jms/JbpmConnectionFactory</code></td>
* <td>Resource Manager Reference</td>
- * <td>Logical name of the factory that provides JMS connections for producing
- * result messages. Required for command messages that indicate a reply
- * destination.
- * </td>
+ * <td>Logical name of the factory that provides JMS connections for producing result messages.
+ * Required for command messages that indicate a reply destination.</td>
* </tr>
* <tr>
* <td><code>jms/DeadLetterQueue</code></td>
* <td>Message Destination Reference</td>
- * <td>Messages which do not contain a command are sent to the queue referenced
- * here. Optional; if absent, such messages are rejected, which may cause the
- * container to redeliver.
- * </td>
+ * <td>Messages which do not contain a command are sent to the queue referenced here. Optional; if
+ * absent, such messages are rejected, which may cause the container to redeliver.</td>
* </tr>
* </table>
*
* @author Jim Rigsbee
- * @author Tom Baeyens
* @author Alejandro Guizar
+ * @author Tom Baeyens
*/
-public class CommandListenerBean implements MessageDrivenBean, MessageListener
-{
+public class CommandListenerBean implements MessageDrivenBean, MessageListener {
private static final long serialVersionUID = 1L;
- MessageDrivenContext messageDrivenContext = null;
+ MessageDrivenContext messageDrivenContext;
LocalCommandService commandService;
Connection jmsConnection;
Destination deadLetterQueue;
- public void onMessage(Message message)
- {
- try
- {
+ public void onMessage(Message message) {
+ try {
// extract command from message
Command command = extractCommand(message);
- if (command == null)
- {
+ if (command == null) {
discard(message);
return;
}
@@ -119,54 +111,42 @@
Object result = commandService.execute(command);
// send a response back if a "reply to" destination is set
Destination replyTo = message.getJMSReplyTo();
- if (replyTo != null && (result instanceof Serializable || result == null))
- {
- sendResult((Serializable)result, replyTo, message.getJMSMessageID());
+ if (replyTo != null && (result instanceof Serializable || result == null)) {
+ sendResult((Serializable) result, replyTo, message.getJMSMessageID());
}
}
- catch (JMSException e)
- {
+ catch (JMSException e) {
messageDrivenContext.setRollbackOnly();
log.error("could not process message " + message, e);
}
}
- protected Command extractCommand(Message message) throws JMSException
- {
- Command command = null;
- if (message instanceof ObjectMessage)
- {
+ protected Command extractCommand(Message message) throws JMSException {
+ if (message instanceof ObjectMessage) {
log.debug("deserializing command from jms message...");
- ObjectMessage objectMessage = (ObjectMessage)message;
+ ObjectMessage objectMessage = (ObjectMessage) message;
Serializable object = objectMessage.getObject();
- if (object instanceof Command)
- {
- command = (Command)object;
+ if (object instanceof Command) {
+ return (Command) object;
}
- else
- {
- log.warn("ignoring object message cause it isn't a command '" + object + "'" + (object != null ? " (" + object.getClass().getName() + ")" : ""));
+ else {
+ log.warn("not a command: " + object);
}
}
- else
- {
- log.warn("ignoring message '" + message + "' cause it isn't an ObjectMessage (" + message.getClass().getName() + ")");
+ else {
+ log.warn("not an object message: " + message);
}
- return command;
+ return null;
}
- private void discard(Message message) throws JMSException
- {
- if (deadLetterQueue == null)
- {
+ private void discard(Message message) throws JMSException {
+ if (deadLetterQueue == null) {
// lookup dead letter queue
- try
- {
+ try {
Context initial = new InitialContext();
- deadLetterQueue = (Destination)initial.lookup("java:comp/env/jms/DeadLetterQueue");
+ deadLetterQueue = (Destination) initial.lookup("java:comp/env/jms/DeadLetterQueue");
}
- catch (NamingException e)
- {
+ catch (NamingException e) {
log.debug("failed to retrieve dead letter queue, rejecting message: " + message);
messageDrivenContext.setRollbackOnly();
return;
@@ -174,70 +154,58 @@
}
// send message to dead letter queue
Session jmsSession = createSession();
- try
- {
+ try {
jmsSession.createProducer(deadLetterQueue).send(message);
}
- finally
- {
+ finally {
jmsSession.close();
}
}
- private void sendResult(Serializable result, Destination destination, String correlationId) throws JMSException
- {
+ private void sendResult(Serializable result, Destination destination, String correlationId)
+ throws JMSException {
log.debug("sending result " + result + " to " + destination);
Session jmsSession = createSession();
- try
- {
+ try {
Message resultMessage = jmsSession.createObjectMessage(result);
resultMessage.setJMSCorrelationID(correlationId);
jmsSession.createProducer(destination).send(resultMessage);
}
- finally
- {
+ finally {
jmsSession.close();
}
}
- private Session createSession() throws JMSException
- {
- if (jmsConnection == null)
- {
+ private Session createSession() throws JMSException {
+ if (jmsConnection == null) {
// lookup factory and create jms connection
- try
- {
+ try {
Context initial = new InitialContext();
- ConnectionFactory jmsConnectionFactory = (ConnectionFactory)initial.lookup("java:comp/env/jms/JbpmConnectionFactory");
+ ConnectionFactory jmsConnectionFactory = (ConnectionFactory) initial.lookup("java:comp/env/jms/JbpmConnectionFactory");
jmsConnection = jmsConnectionFactory.createConnection();
}
- catch (NamingException e)
- {
+ catch (NamingException e) {
throw new EJBException("error retrieving jms connection factory", e);
}
}
/*
- * if the connection supports xa, the session will be transacted, else the session will auto acknowledge; in either case no explicit transaction control must be
- * performed - see ejb 2.1 - 17.3.5
+ * if the connection supports xa, the session will be transacted, else the session will auto
+ * acknowledge; in either case no explicit transaction control must be performed - see ejb 2.1 -
+ * 17.3.5
*/
return jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext)
- {
+ public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
this.messageDrivenContext = messageDrivenContext;
}
- public void ejbRemove()
- {
- if (jmsConnection != null)
- {
- try
- {
+ public void ejbRemove() {
+ if (jmsConnection != null) {
+ try {
jmsConnection.close();
}
- catch (JMSException e)
- {
+ catch (JMSException e) {
log.debug("failed to close jms connection", e);
}
jmsConnection = null;
@@ -247,20 +215,16 @@
messageDrivenContext = null;
}
- public void ejbCreate()
- {
- try
- {
+ public void ejbCreate() {
+ try {
Context initial = new InitialContext();
- LocalCommandServiceHome commandServiceHome = (LocalCommandServiceHome)initial.lookup("java:comp/env/ejb/LocalCommandServiceBean");
+ LocalCommandServiceHome commandServiceHome = (LocalCommandServiceHome) initial.lookup("java:comp/env/ejb/LocalCommandServiceBean");
commandService = commandServiceHome.create();
}
- catch (NamingException e)
- {
+ catch (NamingException e) {
throw new EJBException("error retrieving command service home", e);
}
- catch (CreateException e)
- {
+ catch (CreateException e) {
throw new EJBException("error creating command service", e);
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageService.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -28,119 +28,136 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.jbpm.JbpmContext;
import org.jbpm.JbpmException;
import org.jbpm.db.JobSession;
+import org.jbpm.graph.exe.Token;
import org.jbpm.job.Job;
import org.jbpm.msg.MessageService;
+import org.jbpm.svc.Services;
+import org.jbpm.taskmgmt.exe.TaskInstance;
+import org.jbpm.tx.TxService;
public class JmsMessageService implements MessageService {
private static final long serialVersionUID = 1L;
- JobSession jobSession = null;
- Connection connection = null;
- Session session = null;
- Destination destination = null;
- MessageProducer messageProducer = null;
- boolean isCommitEnabled = false;
+ private static final Log log = LogFactory.getLog(JmsMessageService.class);
- public JmsMessageService(Connection connection, Destination destination, boolean isCommitEnabled) throws JMSException {
+ final JobSession jobSession;
+
+ final Connection connection;
+ final Session session;
+ final MessageProducer messageProducer;
+
+ final boolean isCommitEnabled;
+
+ /**
+ * @deprecated use {@link #JmsMessageService(JmsMessageServiceFactory)} instead
+ */
+ public JmsMessageService(Connection connection, Destination destination, boolean isCommitEnabled)
+ throws JMSException {
JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
- if (jbpmContext==null) {
- throw new JbpmException("jms message service must be created inside a jbpm context");
+ if (jbpmContext == null) throw new JbpmException("no active jbpm context");
+ jobSession = jbpmContext.getJobSession();
+
+ this.connection = connection;
+
+ if (isCommitEnabled) {
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ this.isCommitEnabled = true;
}
- this.jobSession = jbpmContext.getJobSession();
+ else {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this.isCommitEnabled = false;
+ }
- this.connection = connection;
- this.destination = destination;
- this.isCommitEnabled = isCommitEnabled;
- /*
- * If the connection supports XA, the session will always take part in the global transaction.
- * Otherwise the first parameter specifies whether message productions and consumptions
- * are part of a single transaction (TRUE) or performed immediately (FALSE).
- * Messages are never meant to be received before the database transaction commits,
- * hence the transacted is preferable.
- */
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ messageProducer = session.createProducer(destination);
}
+ public JmsMessageService(JmsMessageServiceFactory factory) throws JMSException {
+ this(factory.getConnectionFactory().createConnection(), factory.getDestination(),
+ factory.isCommitEnabled());
+ }
+
public void send(Job job) {
+ jobSession.saveJob(job);
+
try {
- jobSession.saveJob(job);
-
Message message = session.createMessage();
message.setLongProperty("jobId", job.getId());
- if (job.getToken()!=null) {
- message.setLongProperty("tokenId", job.getToken().getId());
- }
- if (job.getProcessInstance()!=null) {
+
+ Token token = job.getToken();
+ if (token != null) {
+ message.setLongProperty("tokenId", token.getId());
message.setLongProperty("processInstanceId", job.getProcessInstance().getId());
}
- if (job.getTaskInstance()!=null) {
- message.setLongProperty("taskInstanceId", job.getTaskInstance().getId());
+
+ TaskInstance taskInstance = job.getTaskInstance();
+ if (taskInstance != null) {
+ message.setLongProperty("taskInstanceId", taskInstance.getId());
}
+
modifyMessage(message, job);
- getMessageProducer().send(message);
- } catch (JMSException e) {
- throw new JbpmException("couldn't send jms message", e);
+ messageProducer.send(message);
}
+ catch (JMSException e) {
+ throw new JbpmException("could not send jms message", e);
+ }
}
/**
- * Hook to modify the message, e.g. adding additional properties
- * to the header required by the own application. One possible
- * use case is to rescue the actor id over the "JMS" intermezzo
- * of asynchronous continuations.
+ * Hook to modify the message, e.g. adding extra properties to the header required by the own
+ * application. One possible use case is to rescue the actor id over the "JMS" intermezzo of
+ * asynchronous continuations.
*/
- public void modifyMessage(Message message, Job job) throws JMSException {
+ protected void modifyMessage(Message message, Job job) throws JMSException {
}
public void close() {
- JbpmException exception = null;
+ try {
+ messageProducer.close();
+ }
+ catch (JMSException e) {
+ log.warn("could not close message producer", e);
+ }
- if (messageProducer!=null) {
+ JMSException commitException = null;
+ if (isCommitEnabled) {
+ TxService txService = (TxService) Services.getCurrentService(Services.SERVICENAME_TX);
try {
- messageProducer.close();
- } catch (Exception e) {
- // NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
- exception = new JbpmException("couldn't close message producer", e);
- }
- }
-
- if (session!=null) {
- if (isCommitEnabled) {
- try {
+ if (txService.isRollbackOnly()) {
+ session.rollback();
+ }
+ else {
session.commit();
- } catch (Exception e) {
- if (exception==null) {
- exception = new JbpmException("couldn't commit JMS session", e);
- }
}
}
-
- try {
- session.close();
- } catch (Exception e) {
- if (exception==null) {
- exception = new JbpmException("couldn't close JMS session", e);
- }
+ catch (JMSException e) {
+ commitException = e;
}
}
- if (connection!=null) {
- try {
- connection.close();
- } catch (Exception e) {
- if (exception==null) {
- exception = new JbpmException("couldn't close JMS connection", e);
- }
- }
+ try {
+ session.close();
}
+ catch (JMSException e) {
+ log.warn("could not close jms session", e);
+ }
- if (exception!=null) {
- throw exception;
+ try {
+ connection.close();
}
+ catch (JMSException e) {
+ log.warn("could not close jms connection", e);
+ }
+
+ if (commitException != null) {
+ throw new JbpmException("could not commit jms session", commitException);
+ }
}
public Session getSession() {
@@ -148,9 +165,6 @@
}
protected MessageProducer getMessageProducer() throws JMSException {
- if (messageProducer==null) {
- messageProducer = session.createProducer(destination);
- }
return messageProducer;
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactory.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -21,7 +21,6 @@
*/
package org.jbpm.msg.jms;
-import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -35,9 +34,8 @@
import org.jbpm.svc.ServiceFactory;
/**
- * The JMS message service leverages the reliable communication infrastructure
- * accessed through JMS interfaces to deliver asynchronous continuation
- * messages to the {@link JobListenerBean}.
+ * The JMS message service leverages the reliable communication infrastructure available through JMS
+ * interfaces to deliver asynchronous continuation messages to the {@link JobListenerBean}.
*
* <h3>Configuration</h3>
*
@@ -54,103 +52,68 @@
* @author Tom Baeyens
* @author Alejandro Guizar
*/
-public class JmsMessageServiceFactory implements ServiceFactory
-{
+public class JmsMessageServiceFactory implements ServiceFactory {
+
private static final long serialVersionUID = 1L;
String connectionFactoryJndiName = "java:comp/env/jms/JbpmConnectionFactory";
String destinationJndiName = "java:comp/env/jms/JobQueue";
- String commandDestinationJndiName = "java:comp/env/jms/CommandQueue";
- boolean isCommitEnabled = false;
+ boolean isCommitEnabled;
+
private ConnectionFactory connectionFactory;
private Destination destination;
- private Destination commandDestination;
- public ConnectionFactory getConnectionFactory()
- {
- if (connectionFactory == null)
- {
- try
- {
- connectionFactory = (ConnectionFactory)lookup(connectionFactoryJndiName);
+ public ConnectionFactory getConnectionFactory() {
+ if (connectionFactory == null) {
+ try {
+ connectionFactory = (ConnectionFactory) lookup(connectionFactoryJndiName);
}
- catch (NamingException e)
- {
+ catch (NamingException e) {
throw new JbpmException("could not retrieve message connection factory", e);
}
}
return connectionFactory;
}
- public Destination getDestination()
- {
- if (destination == null)
- {
- try
- {
- destination = (Destination)lookup(destinationJndiName);
+ public Destination getDestination() {
+ if (destination == null) {
+ try {
+ destination = (Destination) lookup(destinationJndiName);
}
- catch (NamingException e)
- {
+ catch (NamingException e) {
throw new JbpmException("could not retrieve job destination", e);
}
}
return destination;
}
- public Destination getCommandDestination()
- {
- if (commandDestination == null)
- {
- try
- {
- commandDestination = (Destination)lookup(commandDestinationJndiName);
- }
- catch (NamingException e)
- {
- throw new JbpmException("could not retrieve command destination", e);
- }
- }
- return commandDestination;
- }
-
- public boolean isCommitEnabled()
- {
+ public boolean isCommitEnabled() {
return isCommitEnabled;
}
- private static Object lookup(String name) throws NamingException
- {
+ private static Object lookup(String name) throws NamingException {
Context initial = new InitialContext();
- try
- {
+ try {
return initial.lookup(name);
}
- finally
- {
+ finally {
initial.close();
}
}
- public Service openService()
- {
- try
- {
- Connection connection = getConnectionFactory().createConnection();
- return new JmsMessageService(connection, getDestination(), isCommitEnabled);
+ public Service openService() {
+ try {
+ return new JmsMessageService(this);
}
- catch (JMSException e)
- {
- throw new JbpmException("couldn't open message session", e);
+ catch (JMSException e) {
+ throw new JbpmException("could not open message service", e);
}
}
- public void close()
- {
+ public void close() {
connectionFactory = null;
destination = null;
- commandDestination = null;
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceFactoryImpl.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -28,8 +28,8 @@
import org.jbpm.svc.Service;
/**
- * Alias for {@link JmsMessageServiceFactory}, supplied for compatibility
- * with the jBPM 3.2 series.
+ * Alias for {@link JmsMessageServiceFactory}, supplied for compatibility with jBPM 3.2.2.
+ *
* @deprecated replaced by {@link JmsMessageServiceFactory}
*/
public class JmsMessageServiceFactoryImpl extends JmsMessageServiceFactory {
@@ -40,10 +40,10 @@
try {
Connection connection = getConnectionFactory().createConnection();
return new JmsMessageServiceImpl(connection, getDestination(), isCommitEnabled);
- }
+ }
catch (JMSException e) {
throw new JbpmException("couldn't open message session", e);
- }
+ }
}
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/enterprise/src/main/java/org/jbpm/msg/jms/JmsMessageServiceImpl.java 2009-08-07 08:54:13 UTC (rev 5441)
@@ -26,15 +26,16 @@
import javax.jms.JMSException;
/**
- * Alias for {@link JmsMessageService}, supplied for compatibility with the
- * jBPM 3.2 series.
+ * Alias for {@link JmsMessageService}, supplied for compatibility with jBPM 3.2.2.
+ *
* @deprecated replaced by {@link JmsMessageService}
*/
public class JmsMessageServiceImpl extends JmsMessageService {
private static final long serialVersionUID = 1L;
-
- public JmsMessageServiceImpl(Connection connection, Destination destination, boolean isCommitEnabled) throws JMSException {
+
+ public JmsMessageServiceImpl(Connection connection, Destination destination,
+ boolean isCommitEnabled) throws JMSException {
super(connection, destination, isCommitEnabled);
}
Modified: jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml
===================================================================
--- jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml 2009-08-07 07:33:14 UTC (rev 5440)
+++ jbpm3/branches/jbpm-3.2-soa/modules/userguide/src/main/docbook/en/modules/enterprise.xml 2009-08-07 08:54:13 UTC (rev 5441)
@@ -261,24 +261,35 @@
<itemizedlist>
- <listitem><literal>connectionFactoryJndiName</literal>: the name of the JMS connection factory
- in the JNDI initial context. Defaults to <literal>java:comp/env/jms/JbpmConnectionFactory</literal>.
+ <listitem><literal>connectionFactoryJndiName</literal>: the JNDI name of the JMS connection
+ factory. Defaults to <literal>java:comp/env/jms/JbpmConnectionFactory</literal>.
</listitem>
- <listitem><literal>destinationJndiName</literal>: the name of the JMS destination where job
- messages will be sent. Must match the destination from which <literal>JobListenerBean</literal>
+ <listitem><literal>destinationJndiName</literal>: the JNDI name of the JMS destination where
+ job messages are sent. Must match the destination where <literal>JobListenerBean</literal>
receives messages. Defaults to <literal>java:comp/env/jms/JobQueue</literal>.
</listitem>
- <listitem><literal>isCommitEnabled</literal>: tells whether jBPM should commit the JMS session
- upon <literal>JbpmContext.close()</literal>. Messages produced by the JMS message service are
- never meant to be received before the current transaction commits; hence the JMS sessions
- created by the service are always transacted. The default value -<literal>false</literal>- is
- appropriate when the connection factory in use is XA capable, as the JMS session's produced
- messages will be controlled by the overall JTA transaction. This field should be set to
- <literal>true</literal> if the JMS connection factory is not XA capable so that jBPM commits
- the JMS session's local transaction explicitly.
+ <listitem><literal>isCommitEnabled</literal>: tells whether the message service should
+ create a transacted session and either commit or rollback on close. Messages produced by the
+ JMS message service are never meant to be received before the database transaction commits.
+ The <ulink url="http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS7.html#wp92011">J2EE
+ tutorial</ulink> states "when you create a session in an enterprise bean, the container
+ ignores the arguments you specify, because it manages all transactional properties for
+ enterprise beans". Unfortunately the tutorial fails to indicate that said behavior is not
+ prescriptive. JBoss ignores the <literal>transacted</literal> argument if the
+ connection factory supports XA, since the overall JTA transaction controls the session.
+ Otherwise, <literal>transacted</literal> produces a locally transacted session. In <ulink
+ url="http://e-docs.bea.com/wls/docs103/jms/trans.html">Weblogic</ulink>, JMS transacted
+ sessions are agnostic to JTA transactions even if the connection factory is XA enabled.
+ With <literal>isCommitEnabled</literal> set to <literal>false</literal> (the default),
+ the message service creates a nontransacted, auto-acknowledging session. Such a session
+ works with containers that either disregard the creation arguments or do not bind
+ transacted sessions to JTA. Conversely, setting <literal>isCommitEnabled</literal> to
+ <literal>true</literal> causes the message service to create a transacted session and commit
+ or rollback according to the <literal>TxService.isRollbackOnly</literal> method.
</listitem>
+
</itemizedlist>
<para><literal>EntitySchedulerServiceFactory</literal> builds on the transactional notification
More information about the jbpm-commits
mailing list