[jboss-svn-commits] JBL Code SVN: r19570 - in labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product: rosetta/src/org/jboss/internal/soa/esb/couriers/helpers and 6 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Apr 15 07:17:47 EDT 2008
Author: kevin.conner at jboss.com
Date: 2008-04-15 07:17:46 -0400 (Tue, 15 Apr 2008)
New Revision: 19570
Added:
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml
Modified:
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java
Log:
Merge of JBESB_4_2_1_GA_CP2_2
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -22,40 +22,40 @@
package org.jboss.internal.soa.esb.couriers;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.helpers.JDBCEprDBResourceFactory;
import org.jboss.soa.esb.addressing.Call;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
import org.jboss.soa.esb.common.TransactionStrategy;
import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.CourierTransportException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierServiceBindException;
-import org.jboss.soa.esb.couriers.CourierMarshalUnmarshalException;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
-import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.couriers.*;
import org.jboss.soa.esb.listeners.message.errors.Factory;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.util.Type;
import org.jboss.soa.esb.util.Util;
-import org.xml.sax.SAXParseException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
{
+ protected long _pollLatency = 200;
+
+ protected long _sleepForRetries = 3000; // milliseconds
+
+ protected boolean deleteOnSuccess, deleteOnError;
+ protected boolean _isReceiver;
+
+ private JDBCEprDBResourceFactory jdbcFactory;
+
+ protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+
/**
* package protected constructor - Objects of Courier should only be
* instantiated by the Factory
@@ -65,7 +65,7 @@
SqlTableCourier(JDBCEpr epr) throws CourierException
{
this(epr, false);
- }
+ }
/**
* package protected constructor - Objects of Courier should only be
@@ -76,13 +76,12 @@
SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
{
_isReceiver = isReceiver;
- _epr = epr;
_sleepForRetries = 3000; // TODO magic number - configurable?
try
{
- _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+ deleteOnSuccess = Boolean.TRUE.equals(Boolean.valueOf(epr
.getPostDelete()));
- _errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+ deleteOnError = Boolean.TRUE.equals(Boolean.valueOf(epr
.getErrorDelete()));
}
catch (URISyntaxException e)
@@ -90,25 +89,12 @@
throw new CourierException(e);
}
- } // ________________________________
+ jdbcFactory = new JDBCEprDBResourceFactory(epr);
+ }
- public void cleanup()
- {
- if (null != _conn)
- {
- try
- {
- _conn.release();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- _logger.info("Unable to release connection", e);
- }
- }
+ public void cleanup() {
+ }
- } // ________________________________
-
/**
* package the ESB message in a java.io.Serializable, and write it.
* Delivery occurs within its own transaction if there is no
@@ -129,7 +115,7 @@
if (null == message)
return false;
- String msgId = null;
+ String msgId;
Call call = message.getHeader().getCall();
if (null==call)
message.getHeader().setCall(call=new Call());
@@ -144,572 +130,277 @@
throw new CourierException("Problems with message header ",e);
}
- try
- {
- TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
- Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
- boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
-
- transactional = (txHandle != null);
-
- /*
- * Make sure the current transaction is still active! If we
- * have previously slept, then the timeout may be longer than that
- * associated with the transaction.
- */
-
- if (transactional && !isActive)
- {
- throw new CourierException("Associated transaction is no longer active!");
- }
- }
- catch (TransactionStrategyException ex)
- {
- throw new CourierException(ex);
- }
-
- if (null == _conn)
- {
- try
- {
- _conn = getConn();
- }
- catch (final Exception e)
- {
- throw new CourierServiceBindException(e);
- }
- }
+ boolean transactional = isTransactional();
- while (_conn != null)
- {
- try
- {
- int iCol = 1;
- PreparedStatement PS = insertStatement();
- PS.setString(iCol++, msgId);
- PS.setObject(iCol++, Util.serialize(message));
- PS.setString(iCol++, State.Pending.getColumnValue());
- PS.setLong(iCol++, System.currentTimeMillis());
+ Serializable serilaizedMessage;
+ try {
+ serilaizedMessage = Util.serialize(message);
+ } catch (Exception e) {
+ throw new CourierTransportException("Unable to serialize ESB Message.", e);
+ }
- _conn.execUpdWait(PS, 3);
-
- if (!transactional)
- _conn.commit();
-
- return true;
- }
- catch (SQLException e)
- {
- if (null != _conn)
- {
- try
- {
- if (!transactional)
- _conn.rollback();
- }
- catch (Exception roll)
- {
- _logger.debug(roll);
- }
- }
-
- _logger.debug("SQL exception during deliver", e);
- throw new CourierTransportException(e);
- }
- catch (Exception e)
- {
- if (!jdbcConnectRetry(e))
- throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
- }
- }
- return false;
- } // ________________________________
+ Connection connection = jdbcFactory.createConnection(transactional);
+ try
+ {
+ PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
+ try {
+ insertStatement.setString(1, msgId);
+ insertStatement.setObject(2, serilaizedMessage);
+ insertStatement.setString(3, State.Pending.getColumnValue());
+ insertStatement.setLong(4, System.currentTimeMillis());
- public Message pickup(long millis) throws CourierException, CourierTimeoutException
- {
- Message result = null;
- long limit = System.currentTimeMillis()
- + ((millis < 100) ? 100 : millis);
-
- do
- {
- try
- {
- TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
- Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
- boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
-
- transactional = (txHandle != null);
-
- /*
- * Make sure the current transaction is still active! If we
- * have previously slept, then the timeout may be longer than that
- * associated with the transaction.
- */
-
- /*
- * MessageAwareListener will catch exceptions and roll back the transaction.
- */
-
- if (transactional && !isActive)
- {
- _logger.error("SqlTableCourier - associated transaction is no longer active!");
-
- throw new CourierException("Associated transaction is no longer active!");
- }
- }
- catch (TransactionStrategyException ex)
- {
- _logger.error("Could not determine transaction association!", ex);
-
- throw new CourierException("Could not determine transaction association!");
- }
-
- ResultSet RS = null;
-
- try
- {
- RS = getRowList();
+ insertStatement.executeUpdate();
+ } finally {
+ insertStatement.close();
+ }
- while (null != RS && RS.next())
- {
- String messageId = RS.getString(1);
+ if (!transactional) {
+ connection.commit();
+ }
- if (null == (result = tryToPickup(messageId)))
- continue;
+ return true;
+ }
+ catch (SQLException e)
+ {
+ try
+ {
+ if (!transactional) {
+ connection.rollback();
+ }
+ }
+ catch (Exception roll)
+ {
+ _logger.debug(roll);
+ }
- /*
- * If this is fault message, then throw an exception with the contents. With the
- * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
- */
-
- if (Type.isFaultMessage(result))
- Factory.createExceptionFromFault(result);
-
- return result;
- }
- }
- catch (SQLException e)
- {
- _logger.warn("SQL Exception during pickup", e);
-
- throw new CourierTransportException(e);
- }
- finally
- {
- try
- {
- if (RS != null)
- RS.close();
- }
- catch (final SQLException ex)
- {
- _logger.warn("SQLException during close of ResultSet.", ex);
- }
-
- // Added to make sure we release transactions from all paths
- if (_conn != null)
- {
- try
- {
- if (!transactional)
- _conn.rollback() ;
- }
- catch (final SQLException sqle) {} //ignore
- }
- }
-
- try
- {
- long lSleep = limit - System.currentTimeMillis();
- if (_pollLatency < lSleep)
- lSleep = _pollLatency;
- if (lSleep > 0)
- Thread.sleep(lSleep);
- }
- catch (InterruptedException e)
- {
- return null;
- }
- } while (System.currentTimeMillis() <= limit);
- return null;
- } // ________________________________
-
- private Message tryToPickup(String messageId) throws CourierException,
- SQLException
- {
- int iParm = 1;
-
- select4UpdateStatement().setString(iParm++, messageId);
- select4UpdateStatement().setString(iParm++,
- State.Pending.getColumnValue());
-
- while (_conn != null)
- {
- ResultSet RS = null;
-
- try
- {
- RS = _conn.execQueryWait(select4UpdateStatement(), 3);
-
- while (RS.next())
- {
- Exception eBad = null;
- try
- {
- Message result = Util.deserialize((Serializable) RS
- .getObject(1));
- if (_postDelete)
- deleteMsg(messageId);
- else
- changeStatus(messageId, State.Done);
- return result;
- }
- catch (ClassCastException e)
- {
- eBad = e;
- }
- catch (SAXParseException e)
- {
- eBad = e;
- }
- catch (final IOException e)
- {
- throw new CourierMarshalUnmarshalException(e);
- }
- catch (Exception e)
- {
- throw new CourierException(e);
- }
- if (null != eBad)
- {
- if (_errorDelete)
- deleteMsg(messageId);
- else
- changeStatus(messageId, State.Error);
- continue;
- }
- }
- return null;
- }
- catch (SQLException e)
- {
- throw new CourierTransportException(e);
- }
- catch (Exception ex)
- {
- // bail-out now if we can't reconnect, rather than lose the error in the next sweep.
-
- if (!jdbcConnectRetry(ex))
- throw new CourierTransportException("Caught unexpected exception during SQL receive and could not reconnect!", ex);
- }
- finally
- {
- try
- {
- if (RS != null)
- RS.close();
- }
- catch (final Exception ex)
- {
- _logger.warn("Could not close ResultSet.", ex);
- }
- }
- }
- return null;
- } // ________________________________
-
- private void deleteMsg(String messageId) throws SQLException
- {
- int iParm = 1;
- deleteStatement().setString(iParm++, messageId);
- _conn.execUpdWait(deleteStatement(), 3);
-
- if (!transactional)
- _conn.commit();
+ _logger.debug("SQL exception during deliver", e);
+ throw new CourierTransportException(e);
+ } finally {
+ try {
+ if (!transactional) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ _logger.error("Exception while closing DataSource connection.", e);
+ }
+ }
}
- private void changeStatus(String messageId, State to) throws SQLException
+ public Message pickup(long millis) throws CourierException, CourierTimeoutException
{
- int iParm = 1;
- updateStatusStatement().setString(iParm++, to.getColumnValue());
- updateStatusStatement().setString(iParm++, messageId);
- _conn.execUpdWait(updateStatusStatement(), 3);
-
- if (!transactional)
- _conn.commit();
- }
+ Message result = null;
+ long limit = System.currentTimeMillis()
+ + ((millis < 100) ? 100 : millis);
- private ResultSet getRowList() throws CourierException
- {
- if (null == _conn)
+ do
{
- try
- {
- _conn = getConn();
- }
- catch (final Exception e)
- {
- throw new CourierServiceBindException(e);
- }
- }
- while (_conn != null)
- {
- try
- {
- return _conn.execQueryWait(listStatement(), 3);
- }
- catch (Exception e)
- {
- _logger.debug("Problem encountered while executing query.", e);
- e.printStackTrace();
-
- jdbcConnectRetry(e);
- }
- }
- return null;
+ boolean transactional = isTransactional();
+ Connection connection = jdbcFactory.createConnection(transactional);
+ try {
+ PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
+ try {
+ ResultSet resultSet = listStatement.executeQuery();
+ try {
+ while (resultSet.next()) {
+ String messageId = resultSet.getString(1);
- } // _______________________________
+ result = tryToPickup(messageId, connection);
- private boolean jdbcConnectRetry(Exception exc)
- {
- _logger.debug("DB problem, will try to reconnect", exc);
-
- cleanup();
- _conn = null;
+ // We've successfully picked up a message, so we can commit on a
+ // non-transacted connection...
+ if (!transactional) {
+ connection.commit();
+ }
- _prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
- for (int i1 = 0; i1 < 3; i1++)
- {
- try
- {
- _conn = getConn();
- }
- catch (Exception e)
- {
- try
- {
- Thread.sleep(_sleepForRetries);
- }
- catch (InterruptedException eInt)
- {
- return false;
- }
- }
- }
+ if (result != null) {
+ /*
+ * If this is fault message, then throw an exception with the contents. With the
+ * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
+ */
+ if (Type.isFaultMessage(result)) {
+ Factory.createExceptionFromFault(result);
+ } else {
+ return result;
+ }
+ }
+ }
+ } finally {
+ try {
+ resultSet.close();
+ } catch (Exception e) {
+ _logger.warn("SQL Exception closing ResultSet", e);
+ }
+ }
+ } finally {
+ try {
+ listStatement.close();
+ } catch (Exception e) {
+ _logger.warn("SQL Exception closing PreparedStatement", e);
+ }
+ }
+ } catch (FaultMessageException e) {
+ // The picked up message was a fault, generating this exception
+ // in Factory.createExceptionFromFault. Just rethrow...
+ throw e;
+ } catch (Exception e) {
+ _logger.warn("Exception during pickup", e);
+ if (!transactional) {
+ try {
+ connection.rollback();
+ } catch (SQLException e1) {
+ _logger.warn("SQL Exception during rollback", e);
+ }
+ }
+ throw new CourierTransportException(e);
+ } finally {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ _logger.warn("Error closing DataSource Connection.", e);
+ }
+ }
- return !(_conn == null);
- } // ________________________________
+ try {
+ long lSleep = limit - System.currentTimeMillis();
+ if (_pollLatency < lSleep)
+ lSleep = _pollLatency;
+ if (lSleep > 0)
+ Thread.sleep(lSleep);
+ }
+ catch (InterruptedException e) {
+ return null;
+ }
+ } while (System.currentTimeMillis() <= limit);
- private JdbcCleanConn getConn() throws SQLException, MalformedEPRException, NamingException
- {
- if (null == _conn)
- {
- try
- {
- DataSource DS = null;
- if (_epr.getDatasource() == null) {
- DS = new SimpleDataSource(_epr.getDriver(),
- _epr.getURL(), _epr.getUserName(), _epr.getPassword());
- } else {
- InitialContext initContext;
- try {
- initContext = new InitialContext();
- DS = (DataSource) initContext.lookup(_epr.getDatasource());
- } catch (NamingException e) {
- _logger.error("Problem resolving DataSource through JNDI", e);
-
- throw e; // it'll get wrapped later anyway!
- }
- }
- _conn = new JdbcCleanConn(DS, transactional);
- }
- catch (URISyntaxException ex)
- {
- throw new MalformedEPRException(ex);
- }
- }
- return _conn;
- } // ________________________________
+ return null;
+ }
- protected PreparedStatement listStatement() throws SQLException
+ private Message tryToPickup(String messageId, Connection connection) throws CourierException, SQLException
{
- if (null == _prepGetList)
- {
- try
- {
- String[] columns =
- { _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
+ PreparedStatement selectUpdateStatement = jdbcFactory.createSelect4UpdateStatement(connection);
- StringBuilder sb = new StringBuilder("select");
- int i1 = 0;
- for (String col : columns)
- sb.append((i1++ < 1) ? " " : ",").append(col);
- sb.append(" from ").append(_epr.getTableName());
- sb.append(" where ").append(_epr.getStatusColumn())
- .append("='").append(State.Pending.getColumnValue())
- .append("'").append(" order by 2");
- _prepGetList = getConn().prepareStatement(sb.toString());
- }
- catch (SQLException ex)
- {
- throw ex;
- }
- catch (Exception e)
- {
- e.printStackTrace();
- _logger.debug("Unable to prepare SQL statement", e);
- throw new SQLException("Problem encountered when trying to created PreparedStatement: "+e);
- }
- }
-
- return _prepGetList;
- } // ________________________________
+ try {
+ selectUpdateStatement.setString(1, messageId);
+ selectUpdateStatement.setString(2, State.Pending.getColumnValue());
- protected PreparedStatement select4UpdateStatement()
- {
- if (_prepSel4Upd == null)
- {
- try
- {
- /*
- * TODO make this dynamic using a factory pattern.
- */
+ ResultSet resultSet = selectUpdateStatement.executeQuery();
+ try
+ {
+ if (resultSet.next())
+ {
+ Message result = null;
- StringBuilder sb = null;
+ try
+ {
+ Serializable blob = (Serializable) resultSet.getObject(1);
+ result = Util.deserialize(blob);
+ }
+ catch (Exception e)
+ {
+ // If there's an error deserializing the message blob, we either
+ // delete the message (deleteOnError), or change it's state
+ // to "State.Error" i.e. no exceptions/rollbacks...
+ result = null;
+ } finally {
+ if (result == null && deleteOnError) {
+ deleteMsg(messageId, connection);
+ } else if (result != null && deleteOnSuccess) {
+ deleteMsg(messageId, connection);
+ } else if(result == null) {
+ changeStatus(messageId, State.Error, connection);
+ } else {
+ changeStatus(messageId, State.Done, connection);
+ }
+ }
- if (!_epr.getURL().contains("hsqldb"))
- {
- sb = new StringBuilder("select ").append(
- _epr.getDataColumn()).append(" from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append("=?").append(
- " and ").append(_epr.getStatusColumn())
- .append("=?").append(" for update");
- }
- else
- {
- /*
- * HSQL does not support FOR UPDATE! All tables appear to
- * be inherently updatable!
- */
-
- sb = new StringBuilder("select ").append(
- _epr.getDataColumn()).append(" from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append("=?").append(
- " and ").append(_epr.getStatusColumn())
- .append("=?");
- }
+ return result;
+ }
+ }
+ finally
+ {
+ try
+ {
+ resultSet.close();
+ } catch (final Exception ex) {
+ _logger.warn("Could not close ResultSet.", ex);
+ }
+ }
+ } finally {
+ selectUpdateStatement.close();
+ }
- _prepSel4Upd = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e)
- {
- _logger.debug(e);
- return null;
- }
- }
+ return null;
+ }
- return _prepSel4Upd;
- } // ________________________________
-
- protected PreparedStatement updateStatusStatement()
+ private void deleteMsg(String messageId, Connection connection) throws SQLException
{
- if (null == _prepUpdateStatus)
- try
- {
- StringBuilder sb = new StringBuilder("update ").append(
- _epr.getTableName()).append(" set ").append(
- _epr.getStatusColumn()).append("= ?").append(" where ")
- .append(_epr.getMessageIdColumn()).append("=?");
- _prepUpdateStatus = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e)
- {
- _logger.debug(e);
- return null;
- }
- return _prepUpdateStatus;
- } // ________________________________
+ PreparedStatement statement = jdbcFactory.createDeleteStatement(connection);
- protected PreparedStatement insertStatement()
+ try {
+ statement.setString(1, messageId);
+ statement.executeUpdate();
+ } finally {
+ statement.close();
+ }
+ }
+
+ private void changeStatus(String messageId, State to, Connection connection) throws SQLException
{
- if (null == _prepInsert)
- try
- {
- String[] columns =
- { _epr.getMessageIdColumn(), _epr.getDataColumn(),
- _epr.getStatusColumn(), _epr.getTimestampColumn() };
+ PreparedStatement statement = jdbcFactory.createUpdateStatusStatement(connection);
- StringBuilder sb = new StringBuilder("insert into ").append(
- _epr.getTableName()).append("(");
- int i1 = 0;
- for (String col : columns)
- sb.append((i1++ < 1) ? " " : ",").append(col);
- sb.append(") values (?,?,?,?)");
- _prepInsert = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e)
- {
- _logger.debug(e);
- return null;
- }
- return _prepInsert;
- } // ________________________________
+ try {
+ statement.setString(1, to.getColumnValue());
+ statement.setString(2, messageId);
+ statement.executeUpdate();
+ } finally {
+ statement.close();
+ }
+ }
- protected PreparedStatement deleteStatement()
+ public static enum State
{
- if (null == _prepDelete)
- try
- {
- StringBuilder sb = new StringBuilder("delete from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append(" =?");
- _prepDelete = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e)
- {
- _logger.debug(e);
- return null;
- }
- return _prepDelete;
- } // ________________________________
-
- protected enum State
- {
Pending, WorkInProgress, Done, Error;
- String getColumnValue()
+
+ public String getColumnValue()
{
return toString().substring(0, 1);
}
- }
- public void setPollLatency(Long millis)
+ }
+
+ public void setPollLatency(Long millis)
{
if (millis <= 200)
_logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of "+_pollLatency);
else
_pollLatency = millis;
- } // ________________________________
-
- protected long _pollLatency = 200;
- protected long _sleepForRetries = 3000; // milliseconds
+ }
- protected boolean _postDelete, _errorDelete;
- protected boolean _isReceiver;
+ private boolean isTransactional() throws CourierException {
+ boolean transactional;
+ try
+ {
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+ Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+ boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
- protected JDBCEpr _epr;
+ transactional = (txHandle != null);
- protected JdbcCleanConn _conn;
+ /*
+ * Make sure the current transaction is still active! If we
+ * have previously slept, then the timeout may be longer than that
+ * associated with the transaction.
+ */
- protected PreparedStatement _prepGetList;
- protected PreparedStatement _prepSel4Upd;
- protected PreparedStatement _prepUpdateStatus;
- protected PreparedStatement _prepInsert;
- protected PreparedStatement _prepDelete;
-
- private boolean transactional = false;
-
- protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+ if (transactional && !isActive)
+ {
+ throw new CourierException("Associated transaction is no longer active!");
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ throw new CourierException(ex);
+ }
+ return transactional;
+ }
}
\ No newline at end of file
Copied: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java (from rev 19569, labs/jbossesb/tags/JBESB_4_2_1_GA_CP2_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java)
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java (rev 0)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -0,0 +1,253 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers.helpers;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.SqlTableCourier;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.CourierServiceBindException;
+import org.jboss.soa.esb.couriers.CourierTransportException;
+import org.jboss.soa.esb.util.ClassUtil;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.sql.DataSource;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.DriverManager;
+
+/**
+ * Factory for creating JDBCEpr based database resources for the SQLTableCourier..
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JDBCEprDBResourceFactory {
+
+ private static Logger logger = Logger.getLogger(JDBCEprDBResourceFactory.class);
+
+ private JDBCEpr epr;
+
+ private DataSource dataSource;
+ private String insertStatementSQL;
+ private String listStatementSQL;
+ private String select4UpdateStatementSQL;
+ private String updateStatusStatementSQL;
+ private String deleteStatementSQL;
+
+ public JDBCEprDBResourceFactory(JDBCEpr epr) throws CourierServiceBindException {
+ this.epr = epr;
+
+ if (epr.getDatasource() != null) {
+ lookupDataSource(epr);
+ } else {
+ try {
+ try {
+ ClassUtil.forName(epr.getDriver(), getClass());
+ } catch (ClassNotFoundException e) {
+ throw new CourierServiceBindException("Database driver '" + epr.getDriver() + "' not available on classpath.");
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+ }
+ }
+
+
+ public Connection createConnection(boolean transactional) throws CourierTransportException, CourierServiceBindException {
+ Connection connection;
+
+ try {
+ if(dataSource != null) {
+ connection = dataSource.getConnection();
+ } else {
+ try {
+ connection = DriverManager.getConnection(epr.getURL(), epr.getUserName(), epr.getPassword());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+ }
+ } catch (SQLException e) {
+ throw new CourierTransportException("Failed to connect to DataSource.", e);
+ }
+
+ if (!transactional) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ try {
+ connection.close();
+ } catch (SQLException e1) {
+ logger.error("Failed to close connection.", e1);
+ }
+ throw new CourierTransportException("Failed to turn off autoCommit on connection..", e);
+ }
+ }
+
+ return connection;
+ }
+
+ public PreparedStatement createListStatement(Connection connection) throws SQLException {
+ if (listStatementSQL == null) {
+ listStatementSQL = buildListStatementSQL();
+ }
+ return connection.prepareStatement(listStatementSQL);
+ }
+
+ public PreparedStatement createSelect4UpdateStatement(Connection connection) throws SQLException {
+ if (select4UpdateStatementSQL == null) {
+ select4UpdateStatementSQL = buildSelect4UpdateStatementSQL();
+ }
+ return connection.prepareStatement(select4UpdateStatementSQL);
+ }
+
+ public PreparedStatement createUpdateStatusStatement(Connection connection) throws SQLException {
+ if (updateStatusStatementSQL == null) {
+ updateStatusStatementSQL = buildUpdateStatusStatementSQL();
+ }
+ return connection.prepareStatement(updateStatusStatementSQL);
+ }
+
+ public PreparedStatement createInsertStatement(Connection connection) throws SQLException {
+ if (insertStatementSQL == null) {
+ insertStatementSQL = buildInsertStatementSQL();
+ }
+ return connection.prepareStatement(insertStatementSQL);
+ }
+
+ public PreparedStatement createDeleteStatement(Connection connection) throws SQLException {
+ if (deleteStatementSQL == null) {
+ deleteStatementSQL = buildDeleteStatementSQL();
+ }
+ return connection.prepareStatement(deleteStatementSQL);
+ }
+
+ private String buildSelect4UpdateStatementSQL() {
+ StringBuilder sb = new StringBuilder("select ");
+
+ try {
+ if (!epr.getURL().contains("hsqldb")) {
+ sb = sb.append(
+ epr.getDataColumn()).append(" from ").append(
+ epr.getTableName()).append(" where ").append(
+ epr.getMessageIdColumn()).append("=?").append(
+ " and ").append(epr.getStatusColumn())
+ .append("=?").append(" for update");
+ } else {
+ /*
+ * HSQL does not support FOR UPDATE! All tables appear to
+ * be inherently updatable!
+ */
+ sb = sb.append(
+ epr.getDataColumn()).append(" from ").append(
+ epr.getTableName()).append(" where ").append(
+ epr.getMessageIdColumn()).append("=?").append(
+ " and ").append(epr.getStatusColumn())
+ .append("=?");
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+
+ return sb.toString();
+ }
+
+ private String buildUpdateStatusStatementSQL() {
+ try {
+ StringBuilder sb = new StringBuilder("update ").append(
+ epr.getTableName()).append(" set ").append(
+ epr.getStatusColumn()).append("= ?").append(" where ")
+ .append(epr.getMessageIdColumn()).append("=?");
+
+ return sb.toString();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+ }
+
+ private String buildInsertStatementSQL() {
+ try {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("insert into ").append(epr.getTableName());
+ sb.append(" (");
+ sb.append(epr.getMessageIdColumn()).append(", ");
+ sb.append(epr.getDataColumn()).append(", ");
+ sb.append(epr.getStatusColumn()).append(", ");
+ sb.append(epr.getTimestampColumn());
+ sb.append(") values (?,?,?,?)");
+
+ return sb.toString();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+ }
+
+ private String buildListStatementSQL() {
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("select ");
+ sb.append(epr.getMessageIdColumn()).append(", ");
+ sb.append(epr.getTimestampColumn());
+ sb.append(" from ").append(epr.getTableName());
+ sb.append(" where ").append(epr.getStatusColumn());
+ sb.append(" = '").append(SqlTableCourier.State.Pending.getColumnValue()).append("'");
+ sb.append(" order by 2");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+
+ return sb.toString();
+ }
+
+ private String buildDeleteStatementSQL() {
+ try {
+ StringBuilder sb = new StringBuilder("delete from ").append(
+ epr.getTableName()).append(" where ").append(
+ epr.getMessageIdColumn()).append(" =?");
+
+ return sb.toString();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+ }
+ }
+
+ private void lookupDataSource(JDBCEpr epr) throws CourierServiceBindException {
+ try
+ {
+ if (epr.getDatasource() != null) {
+ InitialContext initContext;
+ try {
+ initContext = new InitialContext();
+ dataSource = (DataSource) initContext.lookup(epr.getDatasource());
+ } catch (NamingException e) {
+ logger.error("Problem resolving DataSource through JNDI", e);
+ throw e; // it'll get wrapped later anyway!
+ }
+ }
+ }
+ catch (final Exception e)
+ {
+ throw new CourierServiceBindException("Failed to lookup DataSource.", e);
+ }
+ }
+}
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -188,11 +188,6 @@
ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
if (freeSessions.size() > 0)
{
- if (logger.isDebugEnabled()) {
- logger.debug("Returning session, poolsize=" + getSessionsInPool()
- + ", maxsize=" + MAX_SESSIONS
- + ", number of pools=" + JmsConnectionPoolContainer.getNumberOfPools());
- }
final JmsSession session = freeSessions.remove(freeSessions.size()-1);
inUseSessions.add(session);
return session ;
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -61,12 +61,12 @@
/**
* Cleanup actions
*/
- private enum Cleanup { close, release }
+ private enum Cleanup { close, release, none }
/**
* The cleanup action for the synchronization.
*/
- private Cleanup cleanupAction = Cleanup.close ;
+ private Cleanup cleanupAction = Cleanup.none ;
/**
* Create the session wrapper.
@@ -132,14 +132,28 @@
return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
}
- protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+ protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
{
- cleanupAction = Cleanup.close ;
+ if (associated)
+ {
+ cleanupAction = Cleanup.close ;
+ }
+ else
+ {
+ pool.handleCloseSession(this) ;
+ }
}
- protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+ protected synchronized void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
{
- cleanupAction = Cleanup.release ;
+ if (associated)
+ {
+ cleanupAction = Cleanup.release ;
+ }
+ else
+ {
+ pool.handleReleaseSession(this) ;
+ }
}
protected synchronized void associate()
@@ -147,6 +161,7 @@
{
if (!associated)
{
+ cleanupAction = Cleanup.none ;
final XAResource resource = session.getXAResource() ;
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
try
@@ -190,6 +205,8 @@
case release:
pool.handleReleaseSession(this) ;
break ;
+ case none:
+ // Reference held by caller
}
associated = false ;
}
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -83,17 +83,21 @@
public void release()
{
- if ((null != m_conn) && (!transactional))
+ if (null != m_conn)
{
- try
+ if (!transactional)
{
- m_conn.rollback();
+ try
+ {
+ m_conn.rollback();
+ }
+ catch (Exception eRoll)
+ {
+ }
}
- catch (Exception eRoll)
- {
- }
for (PreparedStatement PS : m_olPrepSt)
+ {
try
{
PS.close();
@@ -101,6 +105,7 @@
catch (Exception e)
{
}
+ }
try
{
m_conn.close();
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -32,12 +32,13 @@
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.couriers.FaultMessageException;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -86,6 +87,10 @@
*/
private long errorDelay ;
+ private TransactionStrategy transactionStrategy;
+ private boolean transactional = false;
+ private boolean rollbackOnPipelineFaults = true;
+
/**
* public constructor
*
@@ -156,6 +161,11 @@
}
}
_latencySecs = lSeconds ;
+
+ transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+ transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
+
+ rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
}
/**
@@ -170,32 +180,20 @@
try
{
pipeline = new ActionProcessingPipeline(_config) ;
+ pipeline.setTransactional(transactional);
pipeline.initialise() ;
}
catch (final ConfigurationException ce)
{
throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
}
+
this.pipeline = pipeline ;
- final TwoWayCourier pickUpCourier ;
+ final PickUpOnlyCourier pickUpCourier ;
try
{
- pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
- try
- {
- final Method setPollLatency = pickUpCourier.getClass().getMethod(
- "setPollLatency", new Class[] { Long.class });
- setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
- }
- catch (final NoSuchMethodException nsme)
- {
- // OK, just leave it null
- }
- catch (final Exception ex)
- {
- CourierUtil.cleanCourier(pickUpCourier);
- throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
- }
+ pickUpCourier = getCourier() ;
+ cleanCourier(pickUpCourier) ;
}
catch (final MalformedEPRException mepre)
{
@@ -205,16 +203,13 @@
{
throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
}
-
- _pickUpCourier = pickUpCourier ;
-
+
try
{
RegistryUtil.register(_config, _epr);
}
catch (final RegistryException re)
{
- CourierUtil.cleanCourier(_pickUpCourier);
throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
}
}
@@ -261,17 +256,49 @@
}
}
+ /**
+ * We have JMS transactional delivery/work semantics: before pulling a unit of work
+ * we start a transaction. If the pipeline completes successfully then we will
+ * commit that transaction and the OUW will be deleted. If we have to roll back
+ * the transaction then the UOW will be placed back on the input "queue" (assumes that
+ * the courier is transactional).
+ *
+ * @param maxWaitMillis
+ */
public void waitForEventAndProcess (long maxWaitMillis)
{
Message message = null ;
+ boolean problem = false;
+
+ PickUpOnlyCourier pickUpCourier = null ;
try
{
- message = (maxWaitMillis > 0) ? _pickUpCourier
+ transactionStrategy.begin();
+
+ pickUpCourier = getCourier() ;
+
+ message = (maxWaitMillis > 0) ? pickUpCourier
.pickup(maxWaitMillis) : null;
errorDelay = 0 ;
}
+ catch (TransactionStrategyException ex)
+ {
+ _logger.error("Could not begin transaction!");
+
+ problem = true;
+
+ return;
+ }
+ catch (MalformedEPRException e)
+ {
+ problem = true;
+
+ return;
+ }
catch (CourierTimeoutException e)
{
+ problem = true;
+
return;
}
catch (FaultMessageException fme)
@@ -291,25 +318,39 @@
}
_logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+
+ problem = true;
+
return;
}
+ finally
+ {
+ if (problem || (message == null))
+ {
+ cleanCourier(pickUpCourier) ;
+
+ rollbackTransaction();
+ }
+ }
if (null != message)
{
- final Message pipelineMessage = message ;
- final Runnable pipelineRunner = new Runnable() {
- public void run() {
- try {
- pipeline.process(pipelineMessage) ;
- } finally {
- updateThreadCount(-1) ;
- }
- }
- } ;
- updateThreadCount(+1);
- _execService.execute(pipelineRunner);
+ try
+ {
+ final Message pipelineMessage = message ;
+ final Object txHandle = transactionStrategy.suspend();
+ final TransactionalRunner txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
+
+ updateThreadCount(+1);
+ _execService.execute(txRunner);
+ }
+ catch (TransactionStrategyException ex)
+ {
+ _logger.warn("Caught transaction related exception: ", ex);
+ cleanCourier(pickUpCourier);
+ rollbackTransaction();
+ }
}
-
} // ________________________________
/**
@@ -403,7 +444,131 @@
}
}
}
+
+ private void rollbackTransaction ()
+ {
+ try
+ {
+ transactionStrategy.rollbackOnly();
+ transactionStrategy.terminate();
+ }
+ catch (Throwable ex)
+ {
+ _logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
+ }
+ }
+
+ private PickUpOnlyCourier getCourier()
+ throws MalformedEPRException, CourierException
+ {
+ PickUpOnlyCourier pickUpCourier = _pickUpCourier;
+ if (transactional || (pickUpCourier == null))
+ {
+ pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
+ try
+ {
+ final Method setPollLatency = pickUpCourier.getClass().getMethod(
+ "setPollLatency", new Class[] { Long.class });
+ setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+ }
+ catch (final NoSuchMethodException nsme)
+ {
+ // OK, just leave it null
+ }
+ catch (final Throwable th)
+ {
+ CourierUtil.cleanCourier(pickUpCourier);
+ throw new CourierException("Problems invoking setPollLatency(long)", th);
+ }
+
+ if (!transactional)
+ {
+ _pickUpCourier = pickUpCourier ;
+ }
+ }
+ return pickUpCourier;
+ }
+
+ private void cleanCourier(final PickUpOnlyCourier pickUpOnlyCourier)
+ {
+ if (transactional)
+ {
+ CourierUtil.cleanCourier(pickUpOnlyCourier) ;
+ }
+ }
+
+ class TransactionalRunner implements Runnable
+ {
+ public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
+ {
+ _courier = courier;
+ _pipelineMessage = pipelineMessage;
+ _txHandle = txHandle;
+ }
+
+ public void run()
+ {
+ boolean problem = false;
+
+ try
+ {
+ if (_txHandle != null)
+ {
+ transactionStrategy.resume(_txHandle);
+ }
+
+ /*
+ * Current strategy is to commit as long as process returns true.
+ * If fails, or any exceptions are caught, then we roll back.
+ *
+ * TODO re-examine the semantics around true/false from the pipeline.
+ */
+
+ // TODO consider adding a RollbackOnFalse option to allow override.
+
+ problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
+
+ if (!problem)
+ {
+ transactionStrategy.terminate();
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ problem = true;
+
+ _logger.warn("TransactionalRunner caught transaction exception: ", ex);
+ }
+ catch (RuntimeException ex)
+ {
+ problem = true;
+
+ throw ex;
+ }
+ catch (Throwable ex)
+ {
+ problem = true;
+
+ _logger.warn("TransactionalRunner caught throwable: ",ex);
+ }
+ finally
+ {
+ cleanCourier(_courier);
+ if (problem)
+ {
+ rollbackTransaction();
+ }
+
+ updateThreadCount(-1);
+ }
+ }
+
+ private PickUpOnlyCourier _courier;
+ private Message _pipelineMessage;
+ private Object _txHandle;
+ }
+
private ConfigTree _config;
private String _eprCategoryName;
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -33,8 +33,11 @@
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
import org.jboss.soa.esb.addressing.util.DefaultReplyTo;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
import org.jboss.soa.esb.message.Message;
@@ -118,52 +121,43 @@
@Test
- public void testJdbcReplyEpr()
- {
+ public void testJdbcReplyEpr() throws MalformedEPRException, CourierException, URISyntaxException, CourierTimeoutException {
_logger.info("_________________________________________");
_logger.info("testJdbcReplyEpr() invoked");
- try
- {
- // Send a Message that will be picked up by a listener, and specify replyTo
- JDBCEpr toEpr = getEpr("foo");
- JDBCEpr replyToEpr = (JDBCEpr)DefaultReplyTo.getReplyTo(toEpr);
+ // Send a Message that will be picked up by a listener, and specify replyTo
+ JDBCEpr toEpr = getEpr("foo");
+ JDBCEpr replyToEpr = (JDBCEpr)DefaultReplyTo.getReplyTo(toEpr);
- String text_1 = "Outgoing";
- Message outgoingMsg = MessageFactory.getInstance().getMessage();
- outgoingMsg.getHeader().getCall().setTo(toEpr);
- outgoingMsg.getHeader().getCall().setReplyTo(replyToEpr);
- outgoingMsg.getBody().add(text_1.getBytes());
- CourierUtil.deliverMessage(outgoingMsg);
+ String text_1 = "Outgoing";
+ Message outgoingMsg = MessageFactory.getInstance().getMessage();
+ outgoingMsg.getHeader().getCall().setTo(toEpr);
+ outgoingMsg.getHeader().getCall().setReplyTo(replyToEpr);
+ outgoingMsg.getBody().add(text_1.getBytes());
+ CourierUtil.deliverMessage(outgoingMsg);
- // Mock a service that picks up the original message and replies
- JDBCEpr serviceEpr = getEpr("foo");
- PickUpOnlyCourier listener = CourierFactory.getPickupCourier(serviceEpr);
- Message received = listener.pickup(100);
- String text_2 = new String((byte[]) received.getBody().get());
- assertTrue(text_1.equals(text_2));
+ // Mock a service that picks up the original message and replies
+ JDBCEpr serviceEpr = getEpr("foo");
+ PickUpOnlyCourier listener = CourierFactory.getPickupCourier(serviceEpr);
+ Message received = listener.pickup(100);
+ String text_2 = new String((byte[]) received.getBody().get());
+ assertTrue(text_1.equals(text_2));
// assertTrue(replyToEpr.equals(received.getHeader().getCall().getReplyTo()));
-
- // now respond to replyTo
- text_2 += " + processed by listener";
- Message response = MessageFactory.getInstance().getMessage();
- response.getHeader().getCall().setTo(received.getHeader().getCall().getReplyTo());
- response.getBody().add(text_2.getBytes());
- CourierUtil.deliverMessage(response);
-
- // try to pick up reply
- PickUpOnlyCourier waiter = CourierFactory.getPickupCourier(replyToEpr);
- Message finalMsg = waiter.pickup(100);
- assertTrue(text_2.equals(new String((byte[]) finalMsg.getBody().get())));
-
- _logger.info(text_2+"... and back from jdbc ReplyTo EPR");
- _logger.info("getDefaultReplyToEpr test succeeded for JDBC message transport");
- }
- catch (Exception e)
- {
- _logger.error(e);
- assertTrue(false);
- }
+ // now respond to replyTo
+ text_2 += " + processed by listener";
+ Message response = MessageFactory.getInstance().getMessage();
+ response.getHeader().getCall().setTo(received.getHeader().getCall().getReplyTo());
+ response.getBody().add(text_2.getBytes());
+ CourierUtil.deliverMessage(response);
+
+ // try to pick up reply
+ PickUpOnlyCourier waiter = CourierFactory.getPickupCourier(replyToEpr);
+ Message finalMsg = waiter.pickup(100);
+ assertTrue(text_2.equals(new String((byte[]) finalMsg.getBody().get())));
+
+ _logger.info(text_2+"... and back from jdbc ReplyTo EPR");
+ _logger.info("getDefaultReplyToEpr test succeeded for JDBC message transport");
+
}
private static void dropTable(String tableName) throws Exception
{
Copied: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml (from rev 19569, labs/jbossesb/tags/JBESB_4_2_1_GA_CP2_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml)
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml (rev 0)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml 2008-04-15 11:17:46 UTC (rev 19570)
@@ -0,0 +1,3 @@
+<jbossesb-deployment>
+ <depends>jboss.esb:service=QuickstartDatabaseInitializer</depends>
+</jbossesb-deployment>
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java 2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java 2008-04-15 11:17:46 UTC (rev 19570)
@@ -24,13 +24,11 @@
import org.jboss.soa.esb.actions.AbstractActionLifecycle;
import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.body.content.BytesBody;
public class MyAction extends AbstractActionLifecycle
{
- private static int checkIter = 0;
+ private boolean fail ;
protected ConfigTree _config;
@@ -51,11 +49,8 @@
if ("data 22".equals(curr.getValue()))
{
System.out.println("DATA READ: "+curr.getValue());
-
- if (checkIter++ < 2) {
- problem = true;
- break;
- }
+ fail = !fail ;
+ problem = fail ;
}
}
System.out.println(results.toString());
More information about the jboss-svn-commits
mailing list