[jboss-svn-commits] JBL Code SVN: r19570 - in labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product: rosetta/src/org/jboss/internal/soa/esb/couriers/helpers and 6 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Apr 15 07:17:47 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-04-15 07:17:46 -0400 (Tue, 15 Apr 2008)
New Revision: 19570

Added:
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml
Modified:
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java
   labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java
Log:
Merge of JBESB_4_2_1_GA_CP2_2

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -22,40 +22,40 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.helpers.JDBCEprDBResourceFactory;
 import org.jboss.soa.esb.addressing.Call;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.common.TransactionStrategy;
 import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.CourierTransportException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierServiceBindException;
-import org.jboss.soa.esb.couriers.CourierMarshalUnmarshalException;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
-import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.couriers.*;
 import org.jboss.soa.esb.listeners.message.errors.Factory;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.util.Type;
 import org.jboss.soa.esb.util.Util;
-import org.xml.sax.SAXParseException;
 
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
 public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
 {
+    protected long _pollLatency = 200;
+
+    protected long _sleepForRetries = 3000; // milliseconds
+
+    protected boolean deleteOnSuccess, deleteOnError;
+	protected boolean _isReceiver;
+
+    private JDBCEprDBResourceFactory jdbcFactory;
+
+	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+
 	/**
 	 * package protected constructor - Objects of Courier should only be
 	 * instantiated by the Factory
@@ -65,7 +65,7 @@
 	SqlTableCourier(JDBCEpr epr) throws CourierException
 	{
 		this(epr, false);
-	}
+    }
 
 	/**
 	 * package protected constructor - Objects of Courier should only be
@@ -76,13 +76,12 @@
 	SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
 	{
 		_isReceiver = isReceiver;
-		_epr = epr;
 		_sleepForRetries = 3000;  // TODO magic number - configurable?
 		try
 		{
-			_postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+			deleteOnSuccess = Boolean.TRUE.equals(Boolean.valueOf(epr
 					.getPostDelete()));
-			_errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+			deleteOnError = Boolean.TRUE.equals(Boolean.valueOf(epr
 					.getErrorDelete()));
 		}
 		catch (URISyntaxException e)
@@ -90,25 +89,12 @@
 			throw new CourierException(e);
 		}
 
-	} // ________________________________
+        jdbcFactory = new JDBCEprDBResourceFactory(epr);
+	}
 
-	public void cleanup()
-	{
-		if (null != _conn)
-		{
-			try
-			{
-				_conn.release();
-			}
-			catch (Exception e)
-			{
-				e.printStackTrace();
-				_logger.info("Unable to release connection", e);
-			}
-		}
+	public void cleanup() {
+	}
 
-	} // ________________________________
-
 	/**
 	 * package the ESB message in a java.io.Serializable, and write it.
 	 * Delivery occurs within its own transaction if there is no
@@ -129,7 +115,7 @@
 		if (null == message)
 			return false;
 
-		String msgId = null;
+		String msgId;
 		Call call = message.getHeader().getCall();
 		if (null==call)
 			message.getHeader().setCall(call=new Call());
@@ -144,572 +130,277 @@
 			throw new CourierException("Problems with message header ",e);
 		}
 
-		try
-		{
-        		TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
-        		Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
-        		boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
-        		
-        		transactional = (txHandle != null);
-        		
-        		/*
-        		 * Make sure the current transaction is still active! If we
-        		 * have previously slept, then the timeout may be longer than that
-        		 * associated with the transaction.
-        		 */
-        		
-        		if (transactional && !isActive)
-        		{
-        			throw new CourierException("Associated transaction is no longer active!");
-        		}
-		}
-		catch (TransactionStrategyException ex)
-		{
-		    throw new CourierException(ex);
-		}
-		
-		if (null == _conn)
-		{
-			try
-			{
-				_conn = getConn();
-			}
-			catch (final Exception e)
-			{
-				throw new CourierServiceBindException(e);
-			}
-		}
+        boolean transactional = isTransactional();
 
-		while (_conn != null)
-		{
-			try
-			{
-				int iCol = 1;
-				PreparedStatement PS = insertStatement();
-				PS.setString(iCol++, msgId);
-				PS.setObject(iCol++, Util.serialize(message));
-				PS.setString(iCol++, State.Pending.getColumnValue());
-				PS.setLong(iCol++, System.currentTimeMillis());
+        Serializable serilaizedMessage;
+        try {
+            serilaizedMessage = Util.serialize(message);
+        } catch (Exception e) {
+            throw new CourierTransportException("Unable to serialize ESB Message.", e);
+        }
 
-				_conn.execUpdWait(PS, 3);
-				
-				if (!transactional)
-				    _conn.commit();
-				
-				return true;
-			}
-			catch (SQLException e)
-			{
-				if (null != _conn)
-				{
-					try
-					{
-					    if (!transactional)
-						_conn.rollback();
-					}
-					catch (Exception roll)
-					{
-						_logger.debug(roll);
-					}
-				}
-				
-				_logger.debug("SQL exception during deliver", e);
-				throw new CourierTransportException(e);
-			}
-			catch (Exception e)
-			{
-				if (!jdbcConnectRetry(e))
-				    throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
-			}
-		}
-		return false;
-	} // ________________________________
+        Connection connection = jdbcFactory.createConnection(transactional);
+        try
+        {
+            PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
+            try {
+                insertStatement.setString(1, msgId);
+                insertStatement.setObject(2, serilaizedMessage);
+                insertStatement.setString(3, State.Pending.getColumnValue());
+                insertStatement.setLong(4, System.currentTimeMillis());
 
-	public Message pickup(long millis) throws CourierException, CourierTimeoutException
-	{
-		Message result = null;
-		long limit = System.currentTimeMillis()
-				+ ((millis < 100) ? 100 : millis);
-		
-		do
-		{
-			try
-			{
-				TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
-				Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
-				boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
-				
-				transactional = (txHandle != null);
-				
-				/*
-				 * Make sure the current transaction is still active! If we
-				 * have previously slept, then the timeout may be longer than that
-				 * associated with the transaction.
-				 */
-				
-				/*
-				 * MessageAwareListener will catch exceptions and roll back the transaction.
-				 */
-				
-				if (transactional && !isActive)
-				{
-				    _logger.error("SqlTableCourier - associated transaction is no longer active!");
-				   
-				    throw new CourierException("Associated transaction is no longer active!");
-				}
-			}
-			catch (TransactionStrategyException ex)
-			{
-				_logger.error("Could not determine transaction association!", ex);
-				
-				throw new CourierException("Could not determine transaction association!");
-			}
-			
-			ResultSet RS = null;
-			
-			try
-			{
-			    RS = getRowList();
+                insertStatement.executeUpdate();
+            } finally {
+                insertStatement.close();
+            }
 
-			    while (null != RS && RS.next())
-			    {
-				String messageId = RS.getString(1);
+            if (!transactional) {
+                connection.commit();
+            }
 
-				if (null == (result = tryToPickup(messageId)))
-				    continue;
+            return true;
+        }
+        catch (SQLException e)
+        {
+            try
+            {
+                if (!transactional) {
+                    connection.rollback();
+                }
+            }
+            catch (Exception roll)
+            {
+                _logger.debug(roll);
+            }
 
-				/*
-				 * If this is fault message, then throw an exception with the contents. With the
-				 * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
-				 */
-
-				if (Type.isFaultMessage(result))
-				    Factory.createExceptionFromFault(result);
-
-				return result;
-			    }
-			}
-			catch (SQLException e)
-			{
-			    _logger.warn("SQL Exception during pickup", e);
-			    
-			    throw new CourierTransportException(e);
-			}
-			finally
-			{
-			    try
-			    {
-        			    if (RS != null)
-        				RS.close();
-			    }
-			    catch (final SQLException ex)
-			    {
-				_logger.warn("SQLException during close of ResultSet.", ex);
-			    }
-			    
-			    // Added to make sure we release transactions from all paths
-			    if (_conn != null)
-			    {
-				try
-				{
-				    if (!transactional)
-					_conn.rollback() ;
-				}
-				catch (final SQLException sqle) {} //ignore
-			    }
-			}
-			
-			try
-			{
-			    long lSleep = limit - System.currentTimeMillis();
-			    if (_pollLatency < lSleep)
-				lSleep = _pollLatency;
-			    if (lSleep > 0)
-				Thread.sleep(lSleep);
-			}
-			catch (InterruptedException e)
-			{
-			    return null;
-			}
-		} while (System.currentTimeMillis() <= limit);
-		return null;
-	} // ________________________________
-
-	private Message tryToPickup(String messageId) throws CourierException,
-			SQLException
-	{
-		int iParm = 1;
-
-		select4UpdateStatement().setString(iParm++, messageId);
-		select4UpdateStatement().setString(iParm++,
-				State.Pending.getColumnValue());
-
-		while (_conn != null)
-		{
-		    ResultSet RS = null;
-		    
-			try
-			{
-				RS = _conn.execQueryWait(select4UpdateStatement(), 3);
-				
-				while (RS.next())
-				{
-					Exception eBad = null;
-					try
-					{
-						Message result = Util.deserialize((Serializable) RS
-								.getObject(1));
-						if (_postDelete)
-							deleteMsg(messageId);
-						else
-							changeStatus(messageId, State.Done);
-						return result;
-					}
-					catch (ClassCastException e)
-					{
-						eBad = e;
-					}
-					catch (SAXParseException e)
-					{
-						eBad = e;
-					}
-					catch (final IOException e)
-					{
-					    throw new CourierMarshalUnmarshalException(e);
-					}
-					catch (Exception e)
-					{
-						throw new CourierException(e);
-					}
-					if (null != eBad)
-					{
-						if (_errorDelete)
-							deleteMsg(messageId);
-						else
-							changeStatus(messageId, State.Error);
-						continue;
-					}
-				}
-				return null;
-			}
-			catch (SQLException e)
-			{
-				throw new CourierTransportException(e);
-			}
-			catch (Exception ex)
-			{
-			    // bail-out now if we can't reconnect, rather than lose the error in the next sweep.
-			    
-				if (!jdbcConnectRetry(ex))
-				    throw new CourierTransportException("Caught unexpected exception during SQL receive and could not reconnect!", ex);
-			}
-			finally
-			{
-			    try
-			    {
-				if (RS != null)
-				    RS.close();
-			    }
-			    catch (final Exception ex)
-			    {
-				_logger.warn("Could not close ResultSet.", ex);
-			    }
-			}
-		}
-		return null;
-	} // ________________________________
-
-	private void deleteMsg(String messageId) throws SQLException
-	{
-		int iParm = 1;
-		deleteStatement().setString(iParm++, messageId);
-		_conn.execUpdWait(deleteStatement(), 3);
-		
-		if (!transactional)
-			_conn.commit();
+            _logger.debug("SQL exception during deliver", e);
+            throw new CourierTransportException(e);
+        } finally {
+            try {
+                if (!transactional) {
+                    connection.close();
+                }
+            } catch (SQLException e) {
+                _logger.error("Exception while closing DataSource connection.", e);
+            }
+        }
 	}
 
-	private void changeStatus(String messageId, State to) throws SQLException
+    public Message pickup(long millis) throws CourierException, CourierTimeoutException
 	{
-		int iParm = 1;
-		updateStatusStatement().setString(iParm++, to.getColumnValue());
-		updateStatusStatement().setString(iParm++, messageId);
-		_conn.execUpdWait(updateStatusStatement(), 3);
-		
-		if (!transactional)
-			_conn.commit();
-	}
+		Message result = null;
+		long limit = System.currentTimeMillis()
+				+ ((millis < 100) ? 100 : millis);
 
-	private ResultSet getRowList() throws CourierException
-	{
-		if (null == _conn)
+		do
 		{
-			try
-			{
-				_conn = getConn();
-			}
-			catch (final Exception e)
-			{
-				throw new CourierServiceBindException(e);
-			}
-		}
-		while (_conn != null)
-		{
-			try
-			{
-				return _conn.execQueryWait(listStatement(), 3);
-			}
-			catch (Exception e)
-			{
-				_logger.debug("Problem encountered while executing query.", e);
-				e.printStackTrace();
-				
-				jdbcConnectRetry(e);
-			}
-		}
-		return null;
+            boolean transactional = isTransactional();
+            Connection connection = jdbcFactory.createConnection(transactional);
+            try {
+                PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
+                try {
+                    ResultSet resultSet = listStatement.executeQuery();
+                    try {
+                        while (resultSet.next()) {
+                            String messageId = resultSet.getString(1);
 
-	} // _______________________________
+                            result = tryToPickup(messageId, connection);
 
-	private boolean jdbcConnectRetry(Exception exc)
-	{
-		_logger.debug("DB problem, will try to reconnect", exc);
-		
-		cleanup();
-		_conn = null;
+                            // We've successfully picked up a message, so we can commit on a
+                            // non-transacted connection...
+                            if (!transactional) {
+                                connection.commit();
+                            }
 
-		_prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
-		for (int i1 = 0; i1 < 3; i1++)
-		{
-			try
-			{
-				_conn = getConn();
-			}
-			catch (Exception e)
-			{
-				try
-				{
-					Thread.sleep(_sleepForRetries);
-				}
-				catch (InterruptedException eInt)
-				{
-					return false;
-				}
-			}
-		}
+                            if (result != null) {
+                                /*
+                                 * If this is fault message, then throw an exception with the contents. With the
+                                 * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
+                                 */
+                                if (Type.isFaultMessage(result)) {
+                                    Factory.createExceptionFromFault(result);
+                                } else {
+                                    return result;
+                                }
+                            }
+                        }
+                    } finally {
+                        try {
+                            resultSet.close();
+                        } catch (Exception e) {
+                            _logger.warn("SQL Exception closing ResultSet", e);
+                        }
+                    }
+                } finally {
+                    try {
+                        listStatement.close();
+                    } catch (Exception e) {
+                        _logger.warn("SQL Exception closing PreparedStatement", e);
+                    }
+                }
+            } catch (FaultMessageException e) {
+                // The picked up message was a fault, generating this exception
+                // in Factory.createExceptionFromFault.  Just rethrow...
+                throw e;
+            } catch (Exception e) {
+                _logger.warn("Exception during pickup", e);
+                if (!transactional) {
+                    try {
+                        connection.rollback();
+                    } catch (SQLException e1) {
+                        _logger.warn("SQL Exception during rollback", e);
+                    }
+                }
+                throw new CourierTransportException(e);
+            } finally {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    _logger.warn("Error closing DataSource Connection.", e);
+                }
+            }
 
-		return !(_conn == null);
-	} // ________________________________
+            try {
+                long lSleep = limit - System.currentTimeMillis();
+                if (_pollLatency < lSleep)
+                    lSleep = _pollLatency;
+                if (lSleep > 0)
+                    Thread.sleep(lSleep);
+            }
+            catch (InterruptedException e) {
+                return null;
+            }
+        } while (System.currentTimeMillis() <= limit);
 
-	private JdbcCleanConn getConn() throws SQLException, MalformedEPRException, NamingException
-	{
-		if (null == _conn)
-		{
-			try
-			{
-				DataSource DS = null;
-				if (_epr.getDatasource() == null) {
-					DS = new SimpleDataSource(_epr.getDriver(), 
-						_epr.getURL(), _epr.getUserName(), _epr.getPassword());
-				} else {
-					InitialContext initContext;
-					try {
-						initContext = new InitialContext();
-						DS = (DataSource) initContext.lookup(_epr.getDatasource());
-					} catch (NamingException e) {
-						_logger.error("Problem resolving DataSource through JNDI", e);
-						
-						throw e; // it'll get wrapped later anyway!
-					}
-				}
-				_conn = new JdbcCleanConn(DS, transactional);
-			}
-			catch (URISyntaxException ex)
-			{
-				throw new MalformedEPRException(ex);
-			}
-		}
-		return _conn;
-	} // ________________________________
+        return null;
+    }
 
-	protected PreparedStatement listStatement() throws SQLException
+    private Message tryToPickup(String messageId, Connection connection) throws CourierException, SQLException
 	{
-		if (null == _prepGetList)
-		{
-			try
-			{
-				String[] columns =
-				{ _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
+        PreparedStatement selectUpdateStatement = jdbcFactory.createSelect4UpdateStatement(connection);
 
-				StringBuilder sb = new StringBuilder("select");
-				int i1 = 0;
-				for (String col : columns)
-					sb.append((i1++ < 1) ? " " : ",").append(col);
-				sb.append(" from ").append(_epr.getTableName());
-				sb.append(" where ").append(_epr.getStatusColumn())
-						.append("='").append(State.Pending.getColumnValue())
-						.append("'").append(" order by 2");
-				_prepGetList = getConn().prepareStatement(sb.toString());
-			}
-			catch (SQLException ex)
-			{
-				throw ex;
-			}
-			catch (Exception e)
-			{
-				e.printStackTrace();
-				_logger.debug("Unable to prepare SQL statement", e);
-				throw new SQLException("Problem encountered when trying to created PreparedStatement: "+e);
-			}
-		}
-		
-		return _prepGetList;
-	} // ________________________________
+        try {
+            selectUpdateStatement.setString(1, messageId);
+            selectUpdateStatement.setString(2, State.Pending.getColumnValue());
 
-	protected PreparedStatement select4UpdateStatement()
-	{
-		if (_prepSel4Upd == null)
-		{
-			try
-			{
-				/*
-				 * TODO make this dynamic using a factory pattern.
-				 */
+            ResultSet resultSet = selectUpdateStatement.executeQuery();
+            try
+            {
+                if (resultSet.next())
+                {
+                    Message result = null;
 
-				StringBuilder sb = null;
+                    try
+                    {
+                        Serializable blob = (Serializable) resultSet.getObject(1);
+                        result = Util.deserialize(blob);
+                    }
+                    catch (Exception e)
+                    {
+                        // If there's an error deserializing the message blob, we either
+                        // delete the message (deleteOnError), or change it's state
+                        // to "State.Error" i.e. no exceptions/rollbacks...
+                        result = null;
+                    } finally {
+                        if (result == null && deleteOnError) {
+                            deleteMsg(messageId, connection);
+                        } else if (result != null && deleteOnSuccess) {
+                            deleteMsg(messageId, connection);
+                        } else if(result == null) {
+                            changeStatus(messageId, State.Error, connection);
+                        } else {
+                            changeStatus(messageId, State.Done, connection);
+                        }
+                    }
 
-				if (!_epr.getURL().contains("hsqldb"))
-				{
-					sb = new StringBuilder("select ").append(
-							_epr.getDataColumn()).append(" from ").append(
-							_epr.getTableName()).append(" where ").append(
-							_epr.getMessageIdColumn()).append("=?").append(
-							" and ").append(_epr.getStatusColumn())
-							.append("=?").append(" for update");
-				}
-				else
-				{
-					/*
-					 * HSQL does not support FOR UPDATE! All tables appear to
-					 * be inherently updatable!
-					 */
-					
-					sb = new StringBuilder("select ").append(
-							_epr.getDataColumn()).append(" from ").append(
-							_epr.getTableName()).append(" where ").append(
-							_epr.getMessageIdColumn()).append("=?").append(
-							" and ").append(_epr.getStatusColumn())
-							.append("=?");
-				}
+                    return result;
+                }
+            }
+            finally
+            {
+                try
+                {
+                    resultSet.close();
+                } catch (final Exception ex) {
+                    _logger.warn("Could not close ResultSet.", ex);
+                }
+            }
+        } finally {
+            selectUpdateStatement.close();
+        }
 
-				_prepSel4Upd = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		}
+        return null;
+	}
 
-		return _prepSel4Upd;
-	} // ________________________________
-
-	protected PreparedStatement updateStatusStatement()
+    private void deleteMsg(String messageId, Connection connection) throws SQLException
 	{
-		if (null == _prepUpdateStatus)
-			try
-			{
-				StringBuilder sb = new StringBuilder("update ").append(
-						_epr.getTableName()).append(" set ").append(
-						_epr.getStatusColumn()).append("= ?").append(" where ")
-						.append(_epr.getMessageIdColumn()).append("=?");
-				_prepUpdateStatus = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		return _prepUpdateStatus;
-	} // ________________________________
+        PreparedStatement statement = jdbcFactory.createDeleteStatement(connection);
 
-	protected PreparedStatement insertStatement()
+        try {
+            statement.setString(1, messageId);
+            statement.executeUpdate();
+        }   finally {
+            statement.close();
+        }
+    }
+
+    private void changeStatus(String messageId, State to, Connection connection) throws SQLException
 	{
-		if (null == _prepInsert)
-			try
-			{
-				String[] columns =
-				{ _epr.getMessageIdColumn(), _epr.getDataColumn(),
-						_epr.getStatusColumn(), _epr.getTimestampColumn() };
+        PreparedStatement statement = jdbcFactory.createUpdateStatusStatement(connection);
 
-				StringBuilder sb = new StringBuilder("insert into ").append(
-						_epr.getTableName()).append("(");
-				int i1 = 0;
-				for (String col : columns)
-					sb.append((i1++ < 1) ? " " : ",").append(col);
-				sb.append(") values (?,?,?,?)");
-				_prepInsert = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		return _prepInsert;
-	} // ________________________________
+        try {
+            statement.setString(1, to.getColumnValue());
+            statement.setString(2, messageId);
+            statement.executeUpdate();
+        } finally {
+            statement.close();
+        }
+    }
 
-	protected PreparedStatement deleteStatement()
+    public static enum State
 	{
-		if (null == _prepDelete)
-			try
-			{
-				StringBuilder sb = new StringBuilder("delete from ").append(
-						_epr.getTableName()).append(" where ").append(
-						_epr.getMessageIdColumn()).append(" =?");
-				_prepDelete = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		return _prepDelete;
-	} // ________________________________
-
-	protected enum State
-	{
 		Pending, WorkInProgress, Done, Error;
-		String getColumnValue()
+
+        public String getColumnValue()
 		{
 			return toString().substring(0, 1);
 		}
-	}
 
-	public void setPollLatency(Long millis)
+    }
+
+    public void setPollLatency(Long millis)
 	{
 		if (millis <= 200)
 			_logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of "+_pollLatency);
 		else
 			_pollLatency = millis;
-	} // ________________________________
-	
-	protected long _pollLatency = 200;
-	protected long _sleepForRetries = 3000; // milliseconds
+	}
 
-	protected boolean _postDelete, _errorDelete;
-	protected boolean _isReceiver;
+    private boolean isTransactional() throws CourierException {
+        boolean transactional;
+        try
+        {
+            TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+            Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+            boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
 
-	protected JDBCEpr _epr;
+            transactional = (txHandle != null);
 
-	protected JdbcCleanConn _conn;
+            /*
+            * Make sure the current transaction is still active! If we
+            * have previously slept, then the timeout may be longer than that
+            * associated with the transaction.
+            */
 
-	protected PreparedStatement _prepGetList;
-	protected PreparedStatement _prepSel4Upd;
-	protected PreparedStatement _prepUpdateStatus;
-	protected PreparedStatement _prepInsert;
-	protected PreparedStatement _prepDelete;
-	
-	private boolean transactional = false;
-
-	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+            if (transactional && !isActive)
+            {
+                throw new CourierException("Associated transaction is no longer active!");
+            }
+        }
+        catch (TransactionStrategyException ex)
+        {
+            throw new CourierException(ex);
+        }
+        return transactional;
+    }
 }
\ No newline at end of file

Copied: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java (from rev 19569, labs/jbossesb/tags/JBESB_4_2_1_GA_CP2_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java)
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java	                        (rev 0)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -0,0 +1,253 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers.helpers;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.SqlTableCourier;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.CourierServiceBindException;
+import org.jboss.soa.esb.couriers.CourierTransportException;
+import org.jboss.soa.esb.util.ClassUtil;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.sql.DataSource;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.DriverManager;
+
+/**
+ * Factory for creating JDBCEpr based database resources for the SQLTableCourier..
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JDBCEprDBResourceFactory {
+
+    private static Logger logger = Logger.getLogger(JDBCEprDBResourceFactory.class);
+
+    private JDBCEpr epr;
+
+    private DataSource dataSource;
+    private String insertStatementSQL;
+    private String listStatementSQL;
+    private String select4UpdateStatementSQL;
+    private String updateStatusStatementSQL;
+    private String deleteStatementSQL;
+
+    public JDBCEprDBResourceFactory(JDBCEpr epr) throws CourierServiceBindException {
+        this.epr = epr;
+
+        if (epr.getDatasource() != null) {
+            lookupDataSource(epr);
+        } else {
+            try {
+                try {
+                    ClassUtil.forName(epr.getDriver(), getClass());
+                } catch (ClassNotFoundException e) {
+                    throw new CourierServiceBindException("Database driver '" + epr.getDriver() + "' not available on classpath.");
+                }
+            } catch (URISyntaxException e) {
+                throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+            }
+        }
+    }
+
+
+    public Connection createConnection(boolean transactional) throws CourierTransportException, CourierServiceBindException {
+        Connection connection;
+
+        try {
+            if(dataSource != null) {
+                connection = dataSource.getConnection();
+            } else {
+                try {
+                    connection = DriverManager.getConnection(epr.getURL(), epr.getUserName(), epr.getPassword());
+                } catch (URISyntaxException e) {
+                    throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+                }
+            }
+        } catch (SQLException e) {
+            throw new CourierTransportException("Failed to connect to DataSource.", e);
+        }
+
+        if (!transactional) {
+            try {
+                connection.setAutoCommit(false);
+            } catch (SQLException e) {
+                try {
+                    connection.close();
+                } catch (SQLException e1) {
+                    logger.error("Failed to close connection.", e1);
+                }
+                throw new CourierTransportException("Failed to turn off autoCommit on connection..", e);
+            }
+        }
+
+        return connection;
+    }
+
+    public PreparedStatement createListStatement(Connection connection) throws SQLException {
+        if (listStatementSQL == null) {
+            listStatementSQL = buildListStatementSQL();
+        }
+        return connection.prepareStatement(listStatementSQL);
+    }
+
+    public PreparedStatement createSelect4UpdateStatement(Connection connection) throws SQLException {
+        if (select4UpdateStatementSQL == null) {
+            select4UpdateStatementSQL = buildSelect4UpdateStatementSQL();
+        }
+        return connection.prepareStatement(select4UpdateStatementSQL);
+    }
+
+    public PreparedStatement createUpdateStatusStatement(Connection connection) throws SQLException {
+        if (updateStatusStatementSQL == null) {
+            updateStatusStatementSQL = buildUpdateStatusStatementSQL();
+        }
+        return connection.prepareStatement(updateStatusStatementSQL);
+    }
+
+    public PreparedStatement createInsertStatement(Connection connection) throws SQLException {
+        if (insertStatementSQL == null) {
+            insertStatementSQL = buildInsertStatementSQL();
+        }
+        return connection.prepareStatement(insertStatementSQL);
+    }
+
+    public PreparedStatement createDeleteStatement(Connection connection) throws SQLException {
+        if (deleteStatementSQL == null) {
+            deleteStatementSQL = buildDeleteStatementSQL();
+        }
+        return connection.prepareStatement(deleteStatementSQL);
+    }
+
+    private String buildSelect4UpdateStatementSQL() {
+        StringBuilder sb = new StringBuilder("select ");
+
+        try {
+            if (!epr.getURL().contains("hsqldb")) {
+                sb = sb.append(
+                        epr.getDataColumn()).append(" from ").append(
+                        epr.getTableName()).append(" where ").append(
+                        epr.getMessageIdColumn()).append("=?").append(
+                        " and ").append(epr.getStatusColumn())
+                        .append("=?").append(" for update");
+            } else {
+                /*
+                 * HSQL does not support FOR UPDATE! All tables appear to
+                 * be inherently updatable!
+                 */
+                sb = sb.append(
+                        epr.getDataColumn()).append(" from ").append(
+                        epr.getTableName()).append(" where ").append(
+                        epr.getMessageIdColumn()).append("=?").append(
+                        " and ").append(epr.getStatusColumn())
+                        .append("=?");
+            }
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+
+        return sb.toString();
+    }
+
+    private String buildUpdateStatusStatementSQL() {
+        try {
+            StringBuilder sb = new StringBuilder("update ").append(
+                    epr.getTableName()).append(" set ").append(
+                    epr.getStatusColumn()).append("= ?").append(" where ")
+                    .append(epr.getMessageIdColumn()).append("=?");
+
+            return sb.toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+    }
+
+    private String buildInsertStatementSQL() {
+        try {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("insert into ").append(epr.getTableName());
+            sb.append(" (");
+            sb.append(epr.getMessageIdColumn()).append(", ");
+            sb.append(epr.getDataColumn()).append(", ");
+            sb.append(epr.getStatusColumn()).append(", ");
+            sb.append(epr.getTimestampColumn());
+            sb.append(") values (?,?,?,?)");
+
+            return sb.toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+    }
+
+    private String buildListStatementSQL() {
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("select ");
+            sb.append(epr.getMessageIdColumn()).append(", ");
+            sb.append(epr.getTimestampColumn());
+            sb.append(" from ").append(epr.getTableName());
+            sb.append(" where ").append(epr.getStatusColumn());
+            sb.append(" = '").append(SqlTableCourier.State.Pending.getColumnValue()).append("'");
+            sb.append(" order by 2");
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+
+        return sb.toString();
+    }
+
+    private String buildDeleteStatementSQL() {
+        try {
+            StringBuilder sb = new StringBuilder("delete from ").append(
+                    epr.getTableName()).append(" where ").append(
+                    epr.getMessageIdColumn()).append(" =?");
+
+            return sb.toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+    }
+
+    private void lookupDataSource(JDBCEpr epr) throws CourierServiceBindException {
+        try
+        {
+            if (epr.getDatasource() != null) {
+                InitialContext initContext;
+                try {
+                    initContext = new InitialContext();
+                    dataSource = (DataSource) initContext.lookup(epr.getDatasource());
+                } catch (NamingException e) {
+                    logger.error("Problem resolving DataSource through JNDI", e);
+                    throw e; // it'll get wrapped later anyway!
+                }
+            }
+        }
+        catch (final Exception e)
+        {
+            throw new CourierServiceBindException("Failed to lookup DataSource.", e);
+        }
+    }
+}

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -188,11 +188,6 @@
             ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
             if (freeSessions.size() > 0)
             {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Returning session, poolsize=" + getSessionsInPool() 
-                            + ", maxsize=" + MAX_SESSIONS 
-                            + ", number of pools=" + JmsConnectionPoolContainer.getNumberOfPools());
-                }
                 final JmsSession session = freeSessions.remove(freeSessions.size()-1);
                 inUseSessions.add(session);
                 return session ;

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -61,12 +61,12 @@
     /**
      * Cleanup actions
      */
-    private enum Cleanup { close, release }
+    private enum Cleanup { close, release, none }
     
     /**
      * The cleanup action for the synchronization.
      */
-    private Cleanup cleanupAction = Cleanup.close ;
+    private Cleanup cleanupAction = Cleanup.none ;
     
     /**
      * Create the session wrapper.
@@ -132,14 +132,28 @@
         return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
     }
     
-    protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+    protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
     {
-        cleanupAction = Cleanup.close ;
+        if (associated)
+        {
+            cleanupAction = Cleanup.close ;
+        }
+        else
+        {
+            pool.handleCloseSession(this) ;
+        }
     }
     
-    protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+    protected synchronized void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
     {
-        cleanupAction = Cleanup.release ;
+        if (associated)
+        {
+            cleanupAction = Cleanup.release ;
+        }
+        else
+        {
+            pool.handleReleaseSession(this) ;
+        }
     }
     
     protected synchronized void associate()
@@ -147,6 +161,7 @@
     {
         if (!associated)
         {
+            cleanupAction = Cleanup.none ;
             final XAResource resource = session.getXAResource() ;
             final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
             try
@@ -190,6 +205,8 @@
         case release:
             pool.handleReleaseSession(this) ;
             break ;
+        case none:
+            // Reference held by caller
         }
         associated = false ;
     }

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -83,17 +83,21 @@
 	
 	public void release()
 	{
-		if ((null != m_conn) && (!transactional))
+		if (null != m_conn)
 		{
-			try
+			if (!transactional)
 			{
-				m_conn.rollback();
+				try
+				{
+					m_conn.rollback();
+				}
+				catch (Exception eRoll)
+				{
+				}
 			}
-			catch (Exception eRoll)
-			{
-			}
 
 			for (PreparedStatement PS : m_olPrepSt)
+			{
 				try
 				{
 					PS.close();
@@ -101,6 +105,7 @@
 				catch (Exception e)
 				{
 				}
+			}
 			try
 			{
 				m_conn.close();

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -32,12 +32,13 @@
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.couriers.CourierUtil;
 import org.jboss.soa.esb.couriers.FaultMessageException;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -86,6 +87,10 @@
          */
         private long errorDelay ;
 
+        private TransactionStrategy transactionStrategy;
+        private boolean transactional = false;
+        private boolean rollbackOnPipelineFaults = true;
+
         /**
 	 * public constructor
 	 *
@@ -156,6 +161,11 @@
                     }
                 }
                 _latencySecs = lSeconds ;
+                
+                transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+                transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
+                
+                rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
 	}
 
         /**
@@ -170,32 +180,20 @@
             try
             {
                 pipeline = new ActionProcessingPipeline(_config) ;
+                pipeline.setTransactional(transactional);
                 pipeline.initialise() ;
             }
             catch (final ConfigurationException ce)
             {
                 throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
             }
+            
             this.pipeline = pipeline ;
-            final TwoWayCourier pickUpCourier ;
+            final PickUpOnlyCourier pickUpCourier ;
             try
             {
-                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
-                try
-                {
-                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
-                        "setPollLatency", new Class[] { Long.class });
-                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
-                }
-                catch (final NoSuchMethodException nsme)
-                {
-                        // OK, just leave it null
-                }
-                catch (final Exception ex)
-                {
-                    CourierUtil.cleanCourier(pickUpCourier);
-                    throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
-                }
+                pickUpCourier = getCourier() ;
+                cleanCourier(pickUpCourier) ;
             }
             catch (final MalformedEPRException mepre)
             {
@@ -205,16 +203,13 @@
             {
                 throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
             }
-
-            _pickUpCourier = pickUpCourier ;
-
+            
             try
             {
                 RegistryUtil.register(_config, _epr);
             }
             catch (final RegistryException re)
             {
-                CourierUtil.cleanCourier(_pickUpCourier);
                 throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
             }
         }
@@ -261,17 +256,49 @@
             }
         }
 
+        /**
+         * We have JMS transactional delivery/work semantics: before pulling a unit of work
+         * we start a transaction. If the pipeline completes successfully then we will
+         * commit that transaction and the OUW will be deleted. If we have to roll back
+         * the transaction then the UOW will be placed back on the input "queue" (assumes that
+         * the courier is transactional).
+         * 
+         * @param maxWaitMillis
+         */
 	public void waitForEventAndProcess (long maxWaitMillis)
 	{
 		Message message = null ;
+		boolean problem = false;
+		
+                PickUpOnlyCourier pickUpCourier = null ;
 		try
 		{
-			message = (maxWaitMillis > 0) ? _pickUpCourier
+			transactionStrategy.begin();
+			
+			pickUpCourier = getCourier() ;
+			
+			message = (maxWaitMillis > 0) ? pickUpCourier
 					.pickup(maxWaitMillis) : null;
                         errorDelay = 0 ;
 		}
+		catch (TransactionStrategyException ex)
+		{
+			_logger.error("Could not begin transaction!");
+			
+			problem = true;
+			
+			return;
+		}
+                catch (MalformedEPRException e)
+                {
+                        problem = true;
+                        
+                        return;
+                }
 		catch (CourierTimeoutException e)
 		{
+			problem = true;
+			
 			return;
 		}
 		catch (FaultMessageException fme)
@@ -291,25 +318,39 @@
                         }
                         _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
                         waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+
+			problem = true;
+			
 			return;
 		}
+		finally
+		{
+			if (problem || (message == null))
+			{
+			    cleanCourier(pickUpCourier) ;
+			
+				rollbackTransaction();
+			}
+		}
 
 		if (null != message)
 		{
-                    final Message pipelineMessage = message ;
-                    final Runnable pipelineRunner = new Runnable() {
-                        public void run() {
-                            try {
-                                pipeline.process(pipelineMessage) ;
-                            } finally {
-                                updateThreadCount(-1) ;
-                            }
-                        }
-                    } ;
-                    updateThreadCount(+1);
-                    _execService.execute(pipelineRunner);
+			try
+			{
+				final Message pipelineMessage = message ;
+				final Object txHandle = transactionStrategy.suspend();
+				final TransactionalRunner txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
+				
+				updateThreadCount(+1);
+				_execService.execute(txRunner);
+			}
+			catch (TransactionStrategyException ex)
+			{
+				_logger.warn("Caught transaction related exception: ", ex);
+				cleanCourier(pickUpCourier);
+				rollbackTransaction();
+			}
 		}
-
 	} // ________________________________
 
         /**
@@ -403,7 +444,131 @@
                 }
             }
         }
+        
+        private void rollbackTransaction ()
+        {
+        	try
+        	{
+        		transactionStrategy.rollbackOnly();
+        		transactionStrategy.terminate();
+        	}
+        	catch (Throwable ex)
+        	{
+        		_logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
+        	}
+        }
+        
+        private PickUpOnlyCourier getCourier()
+            throws MalformedEPRException, CourierException
+        {
+            PickUpOnlyCourier pickUpCourier = _pickUpCourier;
+            if (transactional || (pickUpCourier == null))
+            {
+                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
+                try
+                {
+                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
+                        "setPollLatency", new Class[] { Long.class });
+                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+                }
+                catch (final NoSuchMethodException nsme)
+                {
+                        // OK, just leave it null
+                }
+                catch (final Throwable th)
+                {
+                    CourierUtil.cleanCourier(pickUpCourier);
+                    throw new CourierException("Problems invoking setPollLatency(long)", th);
+                }
+                
+                if (!transactional)
+                {
+                    _pickUpCourier = pickUpCourier ;
+                }
+            }
 
+            return pickUpCourier;
+        }
+
+        private void cleanCourier(final PickUpOnlyCourier pickUpOnlyCourier)
+        {
+            if (transactional)
+            {
+                CourierUtil.cleanCourier(pickUpOnlyCourier) ;
+            }
+        }
+
+        class TransactionalRunner implements Runnable
+        {
+        	public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
+        	{
+        		_courier = courier;
+        		_pipelineMessage = pipelineMessage;
+        		_txHandle = txHandle;
+        	}
+        	
+        	public void run()
+        	{
+        		boolean problem = false;
+        		
+        		try
+        		{
+        			if (_txHandle != null)
+        			{
+        			    transactionStrategy.resume(_txHandle);
+        			}
+        			
+        			/*
+        			 * Current strategy is to commit as long as process returns true.
+        			 * If fails, or any exceptions are caught, then we roll back.
+        			 * 
+        			 * TODO re-examine the semantics around true/false from the pipeline.
+        			 */
+        			
+        			// TODO consider adding a RollbackOnFalse option to allow override.
+        			
+        			problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
+
+        			if (!problem)
+        			{
+        				transactionStrategy.terminate();
+        			}
+        		}
+        		catch (TransactionStrategyException ex)
+        		{
+        			problem = true;
+        			
+        			_logger.warn("TransactionalRunner caught transaction exception: ", ex);
+        		}
+        		catch (RuntimeException ex)
+        		{
+        			problem = true;
+        			
+        			throw ex;
+        		}
+        		catch (Throwable ex)
+        		{
+        			problem = true;
+        			
+        			_logger.warn("TransactionalRunner caught throwable: ",ex);
+        		}
+        		finally
+        		{
+        		    cleanCourier(_courier);
+        			if (problem)
+        			{
+        				rollbackTransaction();
+        			}
+        			
+        			updateThreadCount(-1);
+        		}
+        	}
+        	
+        	private PickUpOnlyCourier _courier;
+        	private Message _pipelineMessage;
+        	private Object _txHandle;
+        }
+        
         private ConfigTree _config;
 
         private String _eprCategoryName;

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -33,8 +33,11 @@
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.addressing.util.DefaultReplyTo;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
 import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
 import org.jboss.soa.esb.message.Message;
@@ -118,52 +121,43 @@
 
     
   @Test
-    public void testJdbcReplyEpr()
-    {
+    public void testJdbcReplyEpr() throws MalformedEPRException, CourierException, URISyntaxException, CourierTimeoutException {
   		_logger.info("_________________________________________");
     	_logger.info("testJdbcReplyEpr() invoked");
-        try
-        {
-        	//  Send a Message that will be picked up by a listener, and specify replyTo
-        	JDBCEpr toEpr = getEpr("foo");
-        	JDBCEpr replyToEpr = (JDBCEpr)DefaultReplyTo.getReplyTo(toEpr);
+        //  Send a Message that will be picked up by a listener, and specify replyTo
+        JDBCEpr toEpr = getEpr("foo");
+        JDBCEpr replyToEpr = (JDBCEpr)DefaultReplyTo.getReplyTo(toEpr);
 
-        	String text_1 = "Outgoing";
-        	Message outgoingMsg = MessageFactory.getInstance().getMessage();
-        	outgoingMsg.getHeader().getCall().setTo(toEpr);
-        	outgoingMsg.getHeader().getCall().setReplyTo(replyToEpr);
-        	outgoingMsg.getBody().add(text_1.getBytes());
-        	CourierUtil.deliverMessage(outgoingMsg);
+        String text_1 = "Outgoing";
+        Message outgoingMsg = MessageFactory.getInstance().getMessage();
+        outgoingMsg.getHeader().getCall().setTo(toEpr);
+        outgoingMsg.getHeader().getCall().setReplyTo(replyToEpr);
+        outgoingMsg.getBody().add(text_1.getBytes());
+        CourierUtil.deliverMessage(outgoingMsg);
 
-        	// Mock a service that picks up the original message and replies
-        	JDBCEpr serviceEpr = getEpr("foo");
-        	PickUpOnlyCourier listener = CourierFactory.getPickupCourier(serviceEpr);
-        	Message received = listener.pickup(100);
-        	String text_2 = new String((byte[]) received.getBody().get());
-        	assertTrue(text_1.equals(text_2));
+        // Mock a service that picks up the original message and replies
+        JDBCEpr serviceEpr = getEpr("foo");
+        PickUpOnlyCourier listener = CourierFactory.getPickupCourier(serviceEpr);
+        Message received = listener.pickup(100);
+        String text_2 = new String((byte[]) received.getBody().get());
+        assertTrue(text_1.equals(text_2));
 //        	assertTrue(replyToEpr.equals(received.getHeader().getCall().getReplyTo()));
-        	
-        	// now respond to replyTo
-        	text_2	+= " + processed by listener";
-        	Message response = MessageFactory.getInstance().getMessage();
-        	response.getHeader().getCall().setTo(received.getHeader().getCall().getReplyTo());
-        	response.getBody().add(text_2.getBytes());
-        	CourierUtil.deliverMessage(response);
-        	
-        	// try to pick up reply
-        	PickUpOnlyCourier waiter = CourierFactory.getPickupCourier(replyToEpr);
-        	Message finalMsg = waiter.pickup(100);
-        	assertTrue(text_2.equals(new String((byte[]) finalMsg.getBody().get())));
-        	
-        	_logger.info(text_2+"... and back from jdbc ReplyTo EPR");
-        	_logger.info("getDefaultReplyToEpr test succeeded for JDBC message transport");
 
-        }
-        catch (Exception e)
-        {
-  			_logger.error(e);
-            assertTrue(false);
-        }
+        // now respond to replyTo
+        text_2	+= " + processed by listener";
+        Message response = MessageFactory.getInstance().getMessage();
+        response.getHeader().getCall().setTo(received.getHeader().getCall().getReplyTo());
+        response.getBody().add(text_2.getBytes());
+        CourierUtil.deliverMessage(response);
+
+        // try to pick up reply
+        PickUpOnlyCourier waiter = CourierFactory.getPickupCourier(replyToEpr);
+        Message finalMsg = waiter.pickup(100);
+        assertTrue(text_2.equals(new String((byte[]) finalMsg.getBody().get())));
+
+        _logger.info(text_2+"... and back from jdbc ReplyTo EPR");
+        _logger.info("getDefaultReplyToEpr test succeeded for JDBC message transport");
+
     }
 	private static void dropTable(String tableName) throws Exception
 	{

Copied: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml (from rev 19569, labs/jbossesb/tags/JBESB_4_2_1_GA_CP2_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml)
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml	                        (rev 0)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/deployment.xml	2008-04-15 11:17:46 UTC (rev 19570)
@@ -0,0 +1,3 @@
+<jbossesb-deployment>
+  <depends>jboss.esb:service=QuickstartDatabaseInitializer</depends>
+</jbossesb-deployment>

Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java	2008-04-15 11:08:42 UTC (rev 19569)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/samples/quickstarts/helloworld_tx_sql_action/src/org/jboss/soa/esb/samples/quickstart/helloworldtxsqlaction/MyAction.java	2008-04-15 11:17:46 UTC (rev 19570)
@@ -24,13 +24,11 @@
 
 import org.jboss.soa.esb.actions.AbstractActionLifecycle;
 import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.body.content.BytesBody;
 
 public class MyAction extends AbstractActionLifecycle
 {
-    private static int checkIter = 0;
+    private boolean fail ;
     
   protected ConfigTree	_config;
 	  
@@ -51,11 +49,8 @@
 			  if ("data 22".equals(curr.getValue()))
 			  {
 				  System.out.println("DATA READ: "+curr.getValue());
-				  
-				  if (checkIter++ < 2) {
-					  problem = true;
-                      break;
-                  }
+				  fail = !fail ;
+				  problem = fail ;
               }
 		  }
 		 System.out.println(results.toString());




More information about the jboss-svn-commits mailing list