[jboss-svn-commits] JBL Code SVN: r17426 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product: rosetta/src/org/jboss/soa/esb/common and 5 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Dec 28 20:37:43 EST 2007


Author: mark.little at jboss.com
Date: 2007-12-28 20:37:43 -0500 (Fri, 28 Dec 2007)
New Revision: 17426

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1283

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -22,10 +22,24 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.sql.DataSource;
+
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.addressing.Call;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
@@ -36,333 +50,430 @@
 import org.jboss.soa.esb.util.Util;
 import org.xml.sax.SAXParseException;
 
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
+public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
+{
+	/**
+	 * disable default constructor
+	 */
+	private SqlTableCourier()
+	{
+	}
 
-public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier {
-    /**
-     * disable default constructor
-     */
-    private SqlTableCourier() {
-    }
+	/**
+	 * package protected constructor - Objects of Courier should only be
+	 * instantiated by the Factory
+	 * 
+	 * @param epr
+	 */
+	SqlTableCourier(JDBCEpr epr) throws CourierException
+	{
+		this(epr, false);
+	}
 
-    /**
-     * package protected constructor - Objects of Courier should only be
-     * instantiated by the Factory
-     *
-     * @param epr
-     */
-    SqlTableCourier(JDBCEpr epr) throws CourierException {
-        this(epr, false);
-    }
+	/**
+	 * package protected constructor - Objects of Courier should only be
+	 * instantiated by the Factory
+	 * 
+	 * @param epr
+	 */
+	SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
+	{
+		_isReceiver = isReceiver;
+		_epr = epr;
+		_sleepForRetries = 3000;  // TODO magic number - configurable?
+		try
+		{
+			_postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+					.getPostDelete()));
+			_errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+					.getErrorDelete()));
+		}
+		catch (URISyntaxException e)
+		{
+			throw new CourierException(e);
+		}
 
-    /**
-     * package protected constructor - Objects of Courier should only be
-     * instantiated by the Factory
-     *
-     * @param epr
-     */
-    SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException {
-        _isReceiver = isReceiver;
-        _epr = epr;
-        _sleepForRetries = 3000;  // TODO magic number - configurable?
-        try {
-            _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
-                    .getPostDelete()));
-            _errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
-                    .getErrorDelete()));
-        }
-        catch (URISyntaxException e) {
-            throw new CourierException(e);
-        }
+	} // ________________________________
 
-    } // ________________________________
+	public void cleanup()
+	{
+		if (null != _conn)
+		{
+			try
+			{
+				_conn.release();
+			}
+			catch (Exception e)
+			{
+				e.printStackTrace();
+				_logger.info("Unable to release connection", e);
+			}
+		}
 
-    public void cleanup() {
-        if (null != _conn) {
-            try {
-                _conn.release();
-            }
-            catch (Exception e) {
-                _logger.info("Unable to release connection");
-                _logger.debug("Unable to release connection", e);
-            }
-        }
+	} // ________________________________
 
-    } // ________________________________
+	/**
+	 * package the ESB message in a java.io.Serializable, and write it.
+	 * Delivery occurs within its own transaction.
+	 * 
+	 * @param message
+	 *            Message - the message to deliverAsync
+	 * @return boolean - the result of the delivery
+	 * @throws CourierException -
+	 *             if problems were encountered
+	 */
+	
+	public boolean deliver(Message message) throws CourierException
+	{
+		if (_isReceiver)
+			throw new CourierException("This is a read-only Courier");
 
-    /**
-     * package the ESB message in a java.io.Serializable, and write it
-     *
-     * @param message Message - the message to deliverAsync
-     * @return boolean - the result of the delivery
-     * @throws CourierException -
-     *                          if problems were encountered
-     */
-    public boolean deliver(Message message) throws CourierException {
-        if (_isReceiver)
-            throw new CourierException("This is a read-only Courier");
+		if (null == message)
+			return false;
 
-        if (null == message)
-            return false;
+		String msgId = null;
+		Call call = message.getHeader().getCall();
+		if (null==call)
+			message.getHeader().setCall(call=new Call());
+		try
+		{
+			if (null==call.getMessageID())
+				call.setMessageID(new URI(UUID.randomUUID().toString()));
+			msgId = call.getMessageID().toString();
+		}
+		catch (URISyntaxException e)
+		{
+			throw new CourierException("Problems with message header ",e);
+		}
 
-        String msgId = null;
-        Call call = message.getHeader().getCall();
-        if (null == call)
-            message.getHeader().setCall(call = new Call());
-        try {
-            if (null == call.getMessageID())
-                call.setMessageID(new URI(UUID.randomUUID().toString()));
-            msgId = call.getMessageID().toString();
-        }
-        catch (URISyntaxException e) {
-            throw new CourierException("Problems with message header ", e);
-        }
+		if (null == _conn)
+		{
+			try
+			{
+				_conn = getConn();
+			}
+			catch (Exception e)
+			{
+				throw new CourierException(e);
+			}
+		}
 
-        if (null == _conn) {
-            try {
-                _conn = getConn();
-            }
-            catch (Exception e) {
-                throw new CourierException(e);
-            }
-        }
+		while (_conn != null)
+		{
+			try
+			{
+				int iCol = 1;
+				PreparedStatement PS = insertStatement();
+				PS.setString(iCol++, msgId);
+				PS.setObject(iCol++, Util.serialize(message));
+				PS.setString(iCol++, State.Pending.getColumnValue());
+				PS.setLong(iCol++, System.currentTimeMillis());
 
-        while (_conn != null) {
-            try {
-                int iCol = 1;
-                PreparedStatement PS = insertStatement();
-                PS.setString(iCol++, msgId);
-                PS.setObject(iCol++, Util.serialize(message));
-                PS.setString(iCol++, State.Pending.getColumnValue());
-                PS.setLong(iCol++, System.currentTimeMillis());
+				_conn.execUpdWait(PS, 3);
+				_conn.commit();
+				return true;
+			}
+			catch (SQLException e)
+			{
+				if (null != _conn)
+				{
+					try
+					{
+						_conn.rollback();
+					}
+					catch (Exception roll)
+					{
+						_logger.debug(roll);
+					}
+				}
+				
+				_logger.debug("SQL exception during deliver", e);
+				throw new CourierException(e);
+			}
+			catch (Exception e)
+			{
+				jdbcConnectRetry(e);
+			}
+		}
+		return false;
+	} // ________________________________
 
-                _conn.execUpdWait(PS, 3);
-                _conn.commit();
-                return true;
-            }
-            catch (SQLException e) {
-                if (null != _conn) {
-                    try {
-                        _conn.rollback();
-                    }
-                    catch (Exception roll) {
-                        _logger.debug(roll);
-                    }
-                }
-                
-                _logger.debug("SQL exception during deliver", e);
-                throw new CourierException(e);
-            }
-            catch (Exception e) {
-                jdbcConnectRetry(e);
-            }
-        }
-        return false;
-    } // ________________________________
-
-    public Message pickup(long millis) throws CourierException, CourierTimeoutException {
-        Message result = null;
-        long limit = System.currentTimeMillis()
-                + ((millis < 100) ? 100 : millis);
-        do {
-            try {
+	public Message pickup(long millis) throws CourierException, CourierTimeoutException
+	{
+		Message result = null;
+		long limit = System.currentTimeMillis()
+				+ ((millis < 100) ? 100 : millis);
+		
+		do
+		{
+			try
+			{
+				TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+				Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+				boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+				
+				transactional = (txHandle != null);
+				
+				/*
+				 * Make sure the current transaction is still active! If we
+				 * have previously slept, then the timeout may be longer than that
+				 * associated with the transaction.
+				 */
+				
+				/*
+				 * MessageAwareListener will catch exceptions and roll back the transaction.
+				 */
+				
+				if (transactional && !isActive)
+				{
+					throw new CourierException("Associated transaction is no longer active!");
+				}
+			}
+			catch (TransactionStrategyException ex)
+			{
+				_logger.error("Could not determine transaction association!", ex);
+				
+				throw new CourierException("Could not determine transaction association!");
+			}
+			
+			try
+            {
                 ResultSet RS = getRowList();
-                while (null != RS && RS.next()) {
-                    String messageId = RS.getString(1);
-                    if (null == (result = tryToPickup(messageId)))
-                        continue;
 
-                    /*
-                          * If this is fault message, then throw an exception with the contents. With the
-                          * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
-                          */
+				while (null != RS && RS.next())
+				{
+					String messageId = RS.getString(1);
 
-                    if (Type.isFaultMessage(result))
-                        Factory.createExceptionFromFault(result);
-
-                    return result;
-                }
+					if (null == (result = tryToPickup(messageId)))
+						continue;
+					
+					/*
+					 * If this is fault message, then throw an exception with the contents. With the
+					 * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
+					 */
+					
+					if (Type.isFaultMessage(result))
+					    Factory.createExceptionFromFault(result);
+					
+					return result;
+				}
             }
+			catch (SQLException e)
+			{
+				_logger.debug("SQL Exception during pickup", e);
+				return null;
+			}
+                        finally
+                        {
+                            // Added to make sure we release transactions from all paths
+                            if (_conn != null)
+                            {
+                                try
+                                {
+                                	if (!transactional)
+                                		_conn.rollback() ;
+                                }
+                                catch (final SQLException sqle) {} //ignore
+                            }
+                        }
+                        try
+                        {
+                                long lSleep = limit - System.currentTimeMillis();
+                                if (_pollLatency < lSleep)
+                                        lSleep = _pollLatency;
+                                if (lSleep > 0)
+                                        Thread.sleep(lSleep);
+                        }
+                        catch (InterruptedException e)
+                        {
+                                return null;
+                        }
+		} while (System.currentTimeMillis() <= limit);
+		return null;
+	} // ________________________________
 
-            catch (SQLException e) {
-                _logger.debug("SQL Exception during pickup", e);
-                return null;
-            }
-            finally {
-                // Added to make sure we release transactions from all paths
-                if (_conn != null) {
-                    try {
-                        _conn.rollback();
-                    }
-                    catch (final SQLException sqle) {
-                    } //ignore
-                }
-            }
-            try {
-                long lSleep = limit - System.currentTimeMillis();
-                if (_pollLatency < lSleep)
-                    lSleep = _pollLatency;
-                if (lSleep > 0)
-                    Thread.sleep(lSleep);
-            }
-            catch (InterruptedException e) {
-                return null;
-            }
-        } while (System.currentTimeMillis() <= limit);
-        return null;
-    } // ________________________________
+	private Message tryToPickup(String messageId) throws CourierException,
+			SQLException
+	{
+		int iParm = 1;
 
-    private Message tryToPickup(String messageId) throws CourierException,
-            SQLException {
-        int iParm = 1;
+		select4UpdateStatement().setString(iParm++, messageId);
+		select4UpdateStatement().setString(iParm++,
+				State.Pending.getColumnValue());
 
-        select4UpdateStatement().setString(iParm++, messageId);
-        select4UpdateStatement().setString(iParm++,
-                State.Pending.getColumnValue());
+		while (_conn != null)
+		{
+			try
+			{
+				ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
+				while (RS.next())
+				{
+					Exception eBad = null;
+					try
+					{
+						Message result = Util.deserialize((Serializable) RS
+								.getObject(1));
+						if (_postDelete)
+							deleteMsg(messageId);
+						else
+							changeStatus(messageId, State.Done);
+						return result;
+					}
+					catch (ClassCastException e)
+					{
+						eBad = e;
+					}
+					catch (SAXParseException e)
+					{
+						eBad = e;
+					}
+					catch (Exception e)
+					{
+						throw new CourierException(e);
+					}
+					if (null != eBad)
+					{
+						if (_errorDelete)
+							deleteMsg(messageId);
+						else
+							changeStatus(messageId, State.Error);
+						continue;
+					}
+				}
+				return null;
+			}
+			catch (SQLException e)
+			{
+				throw new CourierException(e);
+			}
+			catch (Exception e)
+			{
+				jdbcConnectRetry(e);
+			}
+		}
+		return null;
+	} // ________________________________
 
-        while (_conn != null) {
-            try {
-                ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
-                while (RS.next()) {
-                    Exception eBad = null;
-                    try {
-                        Message result = Util.deserialize((Serializable) RS
-                                .getObject(1));
-                        if (_postDelete)
-                            deleteMsg(messageId);
-                        else
-                            changeStatus(messageId, State.Done);
-                        return result;
-                    }
-                    catch (ClassCastException e) {
-                        eBad = e;
-                    }
-                    catch (SAXParseException e) {
-                        eBad = e;
-                    }
-                    catch (Exception e) {
-                        throw new CourierException(e);
-                    }
-                    if (null != eBad) {
-                        if (_errorDelete)
-                            deleteMsg(messageId);
-                        else
-                            changeStatus(messageId, State.Error);
-                        continue;
-                    }
-                }
-                return null;
-            }
-            catch (SQLException e) {
-                throw new CourierException(e);
-            }
-            catch (Exception e) {
-                jdbcConnectRetry(e);
-            }
-        }
-        return null;
-    } // ________________________________
+	private void deleteMsg(String messageId) throws SQLException
+	{
+		int iParm = 1;
+		deleteStatement().setString(iParm++, messageId);
+		_conn.execUpdWait(deleteStatement(), 3);
+		
+		if (!transactional)
+			_conn.commit();
+	}
 
-    private void deleteMsg(String messageId) throws SQLException {
-        int iParm = 1;
-        deleteStatement().setString(iParm++, messageId);
-        _conn.execUpdWait(deleteStatement(), 3);
-        _conn.commit();
-    }
+	private void changeStatus(String messageId, State to) throws SQLException
+	{
+		int iParm = 1;
+		updateStatusStatement().setString(iParm++, to.getColumnValue());
+		updateStatusStatement().setString(iParm++, messageId);
+		_conn.execUpdWait(updateStatusStatement(), 3);
+		
+		if (!transactional)
+			_conn.commit();
+	}
 
-    private void changeStatus(String messageId, State to) throws SQLException {
-        int iParm = 1;
-        updateStatusStatement().setString(iParm++, to.getColumnValue());
-        updateStatusStatement().setString(iParm++, messageId);
-        _conn.execUpdWait(updateStatusStatement(), 3);
-        _conn.commit();
-    }
+	private ResultSet getRowList() throws CourierException
+	{
+		if (null == _conn)
+		{
+			try
+			{
+				_conn = getConn();
+			}
+			catch (Exception e)
+			{
+				throw new CourierException(e);
+			}
+		}
+		while (_conn != null)
+		{
+			try
+			{
+				return _conn.execQueryWait(listStatement(), 3);
+			}
+			catch (Exception e)
+			{
+				_logger.debug("Problem encountered while executing query.", e);
+				e.printStackTrace();
+				
+				jdbcConnectRetry(e);
+			}
+		}
+		return null;
 
-    private ResultSet getRowList() throws CourierException {
-        if (null == _conn) {
-            try {
-                _conn = getConn();
-            }
-            catch (Exception e) {
-                throw new CourierException(e);
-            }
-        }
-        while (_conn != null) {
-            try {
-                return _conn.execQueryWait(listStatement(), 3);
-            }
-            catch (Exception e) {
-                jdbcConnectRetry(e);
-            }
-        }
-        return null;
+	} // _______________________________
 
-    } // _______________________________
+	private void jdbcConnectRetry(Exception exc)
+	{
+		_logger.debug("DB problem, will try to reconnect", exc);
+		
+		cleanup();
+		_conn = null;
 
-    private void jdbcConnectRetry(Exception exc) {
-        _logger.debug("DB problem, will try to reconnect", exc);
-        if (null != _conn)
-            _conn.release();
-        _conn = null;
+		_prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
+		for (int i1 = 0; i1 < 3; i1++)
+		{
+			try
+			{
+				_conn = getConn();
+			}
+			catch (Exception e)
+			{
+				try
+				{
+					Thread.sleep(_sleepForRetries);
+				}
+				catch (InterruptedException eInt)
+				{
+					return;
+				}
+			}
+		}
+	} // ________________________________
 
-        _prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
-        for (int i1 = 0; i1 < 3; i1++) {
-            try {
-                _conn = getConn();
-            }
-            catch (Exception e) {
-                try {
-                    Thread.sleep(_sleepForRetries);
-                }
-                catch (InterruptedException eInt) {
-                    return;
-                }
-            }
-        }
-    } // ________________________________
+	private JdbcCleanConn getConn() throws SQLException, MalformedEPRException, NamingException
+	{
+		if (null == _conn)
+		{
+			try
+			{
+				DataSource DS = null;
+				if (_epr.getDatasource() == null) {
+					DS = new SimpleDataSource(_epr.getDriver(), 
+						_epr.getURL(), _epr.getUserName(), _epr.getPassword());
+				} else {
+					InitialContext initContext;
+					try {
+						initContext = new InitialContext();
+						DS = (DataSource) initContext.lookup(_epr.getDatasource());
+					} catch (NamingException e) {
+						_logger.error("Problem resolving DataSource through JNDI", e);
+						
+						throw e; // it'll get wrapped later anyway!
+					}
+				}
+				_conn = new JdbcCleanConn(DS, transactional);
+			}
+			catch (URISyntaxException ex)
+			{
+				throw new MalformedEPRException(ex);
+			}
+		}
+		return _conn;
+	} // ________________________________
 
-    private JdbcCleanConn getConn() throws SQLException, MalformedEPRException {
-        if (null == _conn) {
-            try {
-                DataSource DS = null;
-                if (_epr.getDatasource() == null) {
-                    DS = new SimpleDataSource(_epr.getDriver(),
-                            _epr.getURL(), _epr.getUserName(), _epr.getPassword());
-                } else {
-                    InitialContext initContext;
-                    try {
-                        initContext = new InitialContext();
-                        DS = (DataSource) initContext.lookup(_epr.getDatasource());
-                    } catch (NamingException e) {
-                        _logger.error("", e);
-                    }
-                }
-                _conn = new JdbcCleanConn(DS);
-            }
-            catch (URISyntaxException ex) {
-                throw new MalformedEPRException(ex);
-            }
-        }
-        return _conn;
-    } // ________________________________
-
 	protected PreparedStatement listStatement() throws SQLException
 	{
-        if (null == _prepGetList)
+		if (null == _prepGetList)
+		{
+			try
+			{
+				String[] columns =
+				{ _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
 
-            try {
-                String[] columns =
-                        {_epr.getMessageIdColumn(), _epr.getTimestampColumn()};
-
 				StringBuilder sb = new StringBuilder("select");
 				int i1 = 0;
 				for (String col : columns)
@@ -379,15 +490,16 @@
 			}
 			catch (Exception e)
 			{
-				_logger.warn("Unable to prepare SQL statement", e);
-				
-				throw new SQLException("Unable to prepare SQL statement: "+e);
+				e.printStackTrace();
+				_logger.debug("Unable to prepare SQL statement", e);
+				throw new SQLException("Problem encountered when trying to created PreparedStatement: "+e);
 			}
-			
+		}
+		
 		return _prepGetList;
 	} // ________________________________
 
-	protected PreparedStatement select4UpdateStatement() throws SQLException
+	protected PreparedStatement select4UpdateStatement()
 	{
 		if (_prepSel4Upd == null)
 		{
@@ -397,46 +509,45 @@
 				 * TODO make this dynamic using a factory pattern.
 				 */
 
-                StringBuilder sb = null;
+				StringBuilder sb = null;
 
-                if (!_epr.getURL().contains("hsqldb")) {
-                    sb = new StringBuilder("select ").append(
-                            _epr.getDataColumn()).append(" from ").append(
-                            _epr.getTableName()).append(" where ").append(
-                            _epr.getMessageIdColumn()).append("=?").append(
-                            " and ").append(_epr.getStatusColumn())
-                            .append("=?").append(" for update");
-                } else {
-                    /*
-                          * HSQL does not support FOR UPDATE! All tables appear to
-                          * be inherently updatable!
-                          */
-                	sb = new StringBuilder("select ").append(
-                            _epr.getDataColumn()).append(" from ").append(
-                            _epr.getTableName()).append(" where ").append(
-                            _epr.getMessageIdColumn()).append("=?").append(
-                            " and ").append(_epr.getStatusColumn())
-                            .append("=?");
-                }
+				if (!_epr.getURL().contains("hsqldb"))
+				{
+					sb = new StringBuilder("select ").append(
+							_epr.getDataColumn()).append(" from ").append(
+							_epr.getTableName()).append(" where ").append(
+							_epr.getMessageIdColumn()).append("=?").append(
+							" and ").append(_epr.getStatusColumn())
+							.append("=?").append(" for update");
+				}
+				else
+				{
+					/*
+					 * HSQL does not support FOR UPDATE! All tables appear to
+					 * be inherently updatable!
+					 */
+					
+					sb = new StringBuilder("select ").append(
+							_epr.getDataColumn()).append(" from ").append(
+							_epr.getTableName()).append(" where ").append(
+							_epr.getMessageIdColumn()).append("=?").append(
+							" and ").append(_epr.getStatusColumn())
+							.append("=?");
+				}
 
 				_prepSel4Upd = getConn().prepareStatement(sb.toString());
 			}
-			catch (SQLException ex)
-			{
-				throw ex;
-			}
 			catch (Exception e)
 			{
-				_logger.warn(e);
-				
-				throw new SQLException("Caught exception during prepared statement: "+e);
+				_logger.debug(e);
+				return null;
 			}
 		}
-		
+
 		return _prepSel4Upd;
-	}
-	
-	protected PreparedStatement updateStatusStatement() throws SQLException
+	} // ________________________________
+
+	protected PreparedStatement updateStatusStatement()
 	{
 		if (null == _prepUpdateStatus)
 			try
@@ -447,25 +558,23 @@
 						.append(_epr.getMessageIdColumn()).append("=?");
 				_prepUpdateStatus = getConn().prepareStatement(sb.toString());
 			}
-			catch (SQLException ex)
-			{
-				throw ex;
-			}
 			catch (Exception e)
 			{
-				_logger.warn(e);
-				
-				throw new SQLException("Caught exception during prepared statement: "+e);
+				_logger.debug(e);
+				return null;
 			}
 		return _prepUpdateStatus;
 	} // ________________________________
 
-	protected PreparedStatement insertStatement() throws SQLException {
-    if (null == _prepInsert)
-        try {
-            String[] columns =
-                    {_epr.getMessageIdColumn(), _epr.getDataColumn(),
-                            _epr.getStatusColumn(), _epr.getTimestampColumn()};
+	protected PreparedStatement insertStatement()
+	{
+		if (null == _prepInsert)
+			try
+			{
+				String[] columns =
+				{ _epr.getMessageIdColumn(), _epr.getDataColumn(),
+						_epr.getStatusColumn(), _epr.getTimestampColumn() };
+
 				StringBuilder sb = new StringBuilder("insert into ").append(
 						_epr.getTableName()).append("(");
 				int i1 = 0;
@@ -474,20 +583,15 @@
 				sb.append(") values (?,?,?,?)");
 				_prepInsert = getConn().prepareStatement(sb.toString());
 			}
-			catch (SQLException ex)
-			{
-				throw ex;
-			}
 			catch (Exception e)
 			{
-				_logger.warn(e);
-				
-				throw new SQLException("Caught exception during prepared statement: "+e);
+				_logger.debug(e);
+				return null;
 			}
 		return _prepInsert;
 	} // ________________________________
 
-	protected PreparedStatement deleteStatement() throws SQLException
+	protected PreparedStatement deleteStatement()
 	{
 		if (null == _prepDelete)
 			try
@@ -497,49 +601,48 @@
 						_epr.getMessageIdColumn()).append(" =?");
 				_prepDelete = getConn().prepareStatement(sb.toString());
 			}
-			catch (SQLException ex)
-			{
-				throw ex;
-			}
 			catch (Exception e)
 			{
-				_logger.warn(e);
-				
-				throw new SQLException("Caught exception during prepared statement: "+e);
+				_logger.debug(e);
+				return null;
 			}
 		return _prepDelete;
 	} // ________________________________
 
-    protected enum State {
-        Pending, WorkInProgress, Done, Error;
+	protected enum State
+	{
+		Pending, WorkInProgress, Done, Error;
+		String getColumnValue()
+		{
+			return toString().substring(0, 1);
+		}
+	}
 
-        String getColumnValue() {
-            return toString().substring(0, 1);
-        }
-    }
+	public void setPollLatency(Long millis)
+	{
+		if (millis <= 200)
+			_logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of "+_pollLatency);
+		else
+			_pollLatency = millis;
+	} // ________________________________
+	
+	protected long _pollLatency = 200;
+	protected long _sleepForRetries = 3000; // milliseconds
 
-    public void setPollLatency(Long millis) {
-        if (millis <= 200)
-            _logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of " + _pollLatency);
-        else
-            _pollLatency = millis;
-    } // ________________________________
+	protected boolean _postDelete, _errorDelete;
+	protected boolean _isReceiver;
 
-    protected long _pollLatency = 200;
-    protected long _sleepForRetries = 3000; // milliseconds
+	protected JDBCEpr _epr;
 
-    protected boolean _postDelete, _errorDelete;
-    protected boolean _isReceiver;
+	protected JdbcCleanConn _conn;
 
-    protected JDBCEpr _epr;
+	protected PreparedStatement _prepGetList;
+	protected PreparedStatement _prepSel4Upd;
+	protected PreparedStatement _prepUpdateStatus;
+	protected PreparedStatement _prepInsert;
+	protected PreparedStatement _prepDelete;
+	
+	private boolean transactional = false;
 
-    protected JdbcCleanConn _conn;
-
-    protected PreparedStatement _prepGetList;
-    protected PreparedStatement _prepSel4Upd;
-    protected PreparedStatement _prepUpdateStatus;
-    protected PreparedStatement _prepInsert;
-    protected PreparedStatement _prepDelete;
-
-    protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
-}
+	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+}
\ No newline at end of file

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -27,6 +27,7 @@
 import java.net.URL;
 
 import javax.transaction.Status;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
 import org.apache.log4j.Logger;
@@ -207,5 +208,82 @@
                 throw new TransactionStrategyException("Failed to mark the transaction on current thread for rollback", th) ;
             }
         }
+        
+        /**
+         * Get a handle on the currently associated transaction (or null).
+         * @throws TransactionStrategyException
+         */
+        
+        public Object getTransaction () throws TransactionStrategyException
+        {
+        	try
+        	{
+        		return tm.getTransaction();
+        	}
+        	catch (final Throwable th)
+        	{
+        		throw new TransactionStrategyException("Problem when trying to getTransaction: ",th);
+        	}
+        }
+        
+        /**
+         * Suspend the current thread-to-transaction association.
+         * 
+         * @return the associated transaction, or null.
+         * @throws TransactionStrategyException
+         */
+        public Object suspend () throws TransactionStrategyException
+        {
+        	try
+        	{
+        		return tm.suspend();
+        	}
+        	catch (final Throwable th)
+        	{
+        		throw new TransactionStrategyException("Problem when trying to suspend transaction: ",th);
+        	}
+        }
+        
+        /**
+         * Associated the transaction with the current thread.
+         * @param tx
+         * @throws TransactionStrategyException
+         */
+        public void resume (Object tx) throws TransactionStrategyException
+        {
+        	try
+        	{
+        		tm.resume((Transaction) tx);
+        	}
+        	catch (final Throwable th)
+        	{
+        		throw new TransactionStrategyException("Problem when trying to resume transaction: ",th);
+        	}
+        }
+        
+        /**
+         * Is the currently associated transaction active?
+         * @return
+         * @throws TransactionStrategyException
+         */
+        public boolean isActive () throws TransactionStrategyException
+        {
+        	try
+        	{
+        		if (tm.getStatus() == Status.STATUS_ACTIVE)
+        			return true;
+        		else
+        			return false;
+        	}
+        	catch (final Throwable th)
+        	{
+        		throw new TransactionStrategyException("Problem when trying to get transaction status: ",th);
+        	}
+        }
+        
+        public String toString ()
+        {
+        	return "JTATransactionStrategy";
+        }
     }
 }

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -91,6 +91,52 @@
         throws TransactionStrategyException ;
     
     /**
+     * Get a handle on the currently associated transaction (or null).
+     * @throws TransactionStrategyException
+     */
+    
+    public abstract Object getTransaction () throws TransactionStrategyException;
+    
+    /**
+     * Suspend the current thread-to-transaction association.
+     * 
+     * @return the associated transaction, or null.
+     * @throws TransactionStrategyException
+     */
+    public abstract Object suspend () throws TransactionStrategyException;
+    
+    /**
+     * Is the currently associated transaction active?
+     * @return
+     * @throws TransactionStrategyException
+     */
+    public abstract boolean isActive () throws TransactionStrategyException;
+    
+    /**
+     * Associated the transaction with the current thread.
+     * @param tx
+     * @throws TransactionStrategyException
+     */
+    public abstract void resume (Object tx) throws TransactionStrategyException;
+    
+    public static void setStrategy (TransactionStrategy txSt)
+    {
+    	_currentStrategy.set(txSt);
+    }
+    
+    public static TransactionStrategy getStrategy ()
+    {
+    	return _currentStrategy.get();
+    }
+    
+    public static void removeStrategy ()
+    {
+    	_currentStrategy.remove();
+    }
+    
+    private final static ThreadLocal<TransactionStrategy> _currentStrategy = new ThreadLocal<TransactionStrategy>();
+    
+    /**
      * The null transaction strategy.
      * @author kevin
      */
@@ -124,5 +170,50 @@
             throws TransactionStrategyException
         {
         }
+        
+        /**
+         * Get a handle on the currently associated transaction (or null).
+         * @throws TransactionStrategyException
+         */
+        
+        public Object getTransaction () throws TransactionStrategyException
+        {
+        	return null;
+        }
+        
+        /**
+         * Suspend the current thread-to-transaction association.
+         * 
+         * @return the associated transaction, or null.
+         * @throws TransactionStrategyException
+         */
+        public Object suspend () throws TransactionStrategyException
+        {
+        	return null;
+        }
+        
+        /**
+         * Associated the transaction with the current thread.
+         * @param tx
+         * @throws TransactionStrategyException
+         */
+        public void resume (Object tx) throws TransactionStrategyException
+        {
+        }
+        
+        /**
+         * Is the currently associated transaction active?
+         * @return
+         * @throws TransactionStrategyException
+         */
+        public boolean isActive () throws TransactionStrategyException
+        {
+        	return false;
+        }
+        
+        public String toString ()
+        {
+        	return "NullTransactionStrategy";
+        }
     }
 }
\ No newline at end of file

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -38,6 +38,8 @@
 	private DataSource m_oDS = null;
 
 	private Connection m_conn = null;
+	
+	private boolean transactional = false;
 
 	protected List<PreparedStatement> m_olPrepSt = new ArrayList<PreparedStatement>();
 
@@ -45,13 +47,19 @@
 
 	public JdbcCleanConn(DataSource p_oDS)
 	{
+		this(p_oDS, false);
+	}
+	
+	public JdbcCleanConn(DataSource p_oDS, boolean transactional)
+	{
 		m_oDS = p_oDS;
 		m_oLogger = Logger.getLogger(this.getClass());
+		this.transactional = transactional;
 	}
 	
 	public void commit() throws SQLException
 	{
-		if (null != m_conn)
+		if ((null != m_conn) && (!transactional))
 		{
 			m_conn.commit();
 		}
@@ -59,7 +67,7 @@
 
 	public void rollback() throws SQLException
 	{
-		if (null != m_conn)
+		if ((null != m_conn) && (!transactional))
 		{
 			m_conn.rollback();
 		} else {
@@ -73,7 +81,7 @@
 	
 	public void release()
 	{
-		if (null != m_conn)
+		if ((null != m_conn) && (!transactional))
 		{
 			try
 			{
@@ -135,6 +143,9 @@
 			connect();
 		}
 
+		if (p_PS == null)
+			throw new SQLException("Null PreparedStatement!");
+		
 		SQLException eRet = null;
 		int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
 		for (int i1 = 0; i1 < iQtry; i1++)
@@ -172,6 +183,9 @@
 			connect();
 		}
 
+		if (p_PS == null)
+			throw new SQLException("Null PreparedStatement!");
+		
 		SQLException eRet = null;
 		int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
 		for (int i1 = 0; i1 < iQtry; i1++)
@@ -250,8 +264,11 @@
 			throw new RuntimeException("connect() FAILED: no connection");
 		}
 
-		m_conn.setAutoCommit(false);
-		m_conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+		if (!transactional)
+		{
+			m_conn.setAutoCommit(false);
+			m_conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+		}
 
 		m_olPrepSt.clear();
 

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -57,6 +57,7 @@
 	/** Listeners */
 	public static final String LISTENER_CLASS_TAG            = "listenerClass";
         public static final String TRANSACTED_TAG            = "transacted";
+        public static final String ROLLBACK_ON_PIPELINE_FAULTS = "rollbackOnPipelineFaults";
     
 	/** Deployment */
 	public static final String DEPLOYMENT_NAME_TAG 			= "deployment";

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -271,9 +271,18 @@
 			boolean bErrorDel = Boolean.valueOf(tree.getAttribute(
 					JDBCEpr.ERROR_DEL_TAG, "true"));
 			JDBCEpr epr = new JDBCEpr(url, bPostDel, bErrorDel);
-			epr.setDriver(tree.getRequiredAttribute(JDBCEpr.DRIVER_TAG));
-			epr.setUserName(getAttrAndWarn(tree, JDBCEpr.USERNAME_TAG, ""));
-			epr.setPassword(getAttrAndWarn(tree, JDBCEpr.PASSWORD_TAG, ""));
+			
+			String datasource = tree.getAttribute(JDBCEpr.DATASOURCE_TAG);
+			
+			if (datasource == null)
+			{
+				epr.setDriver(tree.getRequiredAttribute(JDBCEpr.DRIVER_TAG));
+				epr.setUserName(getAttrAndWarn(tree, JDBCEpr.USERNAME_TAG, ""));
+				epr.setPassword(getAttrAndWarn(tree, JDBCEpr.PASSWORD_TAG, ""));
+			}
+			else
+				epr.setDatasource(datasource);
+			
 			epr.setTableName(tree.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG));
 			epr.setMessageIdColumn(getColName(tree, JDBCEpr.MESSAGE_ID_COLUMN_TAG));
 			epr.setStatusColumn(getColName(tree, JDBCEpr.STATUS_COLUMN_TAG));

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -65,10 +65,9 @@
 		}
 		
 		if (provider.getDatasource() != null) {
-			if ((provider.getUrl() != null)
-					|| (provider.getUsername() != null)
+			if ((provider.getUsername() != null)
 					|| (provider.getDriver() != null)) {	
-				throw new ConfigurationException ("Invalid sql-provider configuration : a datasource and a URL/username/password/driver "
+				throw new ConfigurationException ("Invalid sql-provider configuration : a datasource and a username/password/driver "
 						+ "combination cannot both be defined.   Use only one (datasource or JDBC connection info)." 
 						+ "Datasource : [" + provider.getDatasource() + "] JDBC URL [" + provider.getUrl() + "]");
 			}			

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -32,6 +32,8 @@
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
@@ -85,6 +87,10 @@
          * The error delay.
          */
         private long errorDelay ;
+        
+        private TransactionStrategy transactionStrategy;
+        private boolean transactional = false;
+        private boolean rollbackOnPipelineFaults = true;
 
         /**
 	 * public constructor
@@ -156,6 +162,11 @@
                     }
                 }
                 _latencySecs = lSeconds ;
+                
+                transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+                transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
+                
+                rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
 	}
 
         /**
@@ -261,17 +272,53 @@
             }
         }
 
+        /**
+         * We have JMS transactional delivery/work semantics: before pulling a unit of work
+         * we start a transaction. If the pipeline completes successfully then we will
+         * commit that transaction and the OUW will be deleted. If we have to roll back
+         * the transaction then the UOW will be placed back on the input "queue" (assumes that
+         * the courier is transactional).
+         * 
+         * @param maxWaitMillis
+         */
 	public void waitForEventAndProcess (long maxWaitMillis)
 	{
 		Message message = null ;
+		boolean problem = false;
+		
 		try
 		{
+			transactionStrategy.begin();
+			
+			/*
+			 * If this is a transactional receive then the courier
+			 * needs to be reset afterwards, because we can only
+			 * guarantee one instance per transaction. If the courier
+			 * instance does some handy multiplexing internally across different
+			 * transactions then we won't be able to handle that at this level.
+			 * However, at the moment that isn't an issue.
+			 */
+			
+			TransactionStrategy.setStrategy(transactionStrategy);
+			
 			message = (maxWaitMillis > 0) ? _pickUpCourier
 					.pickup(maxWaitMillis) : null;
                         errorDelay = 0 ;
 		}
+		catch (TransactionStrategyException ex)
+		{
+			// could not begin transaction!
+			
+			_logger.error("Could not begin transaction!");
+			
+			problem = true;
+			
+			return;
+		}
 		catch (CourierTimeoutException e)
 		{
+			problem = true;
+			
 			return;
 		}
 		catch (FaultMessageException fme)
@@ -280,35 +327,79 @@
 		}
 		catch (CourierException e)
 		{
-                        _logger.debug("Courier Exception", e);
-                        if (errorDelay == 0)
-                        {
-                            errorDelay = MIN_ERROR_DELAY ;
-                        }
-                        else if (errorDelay < MAX_ERROR_DELAY)
-                        {
-                            errorDelay <<= 1 ;
-                        }
-                        _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
-                        waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+			_logger.debug("Courier Exception", e);
+			if (errorDelay == 0)
+			{
+				errorDelay = MIN_ERROR_DELAY ;
+			}
+			else if (errorDelay < MAX_ERROR_DELAY)
+			{
+				errorDelay <<= 1 ;
+			}
+			e.printStackTrace();
+			_logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
+			waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+
+			problem = true;
+			
 			return;
 		}
+		finally
+		{
+			if (problem)
+			{
+				try
+				{
+					if (transactionStrategy.getTransaction() != null)
+					{
+						CourierUtil.cleanCourier(_pickUpCourier);
+					
+						resetCourier();
+					}
+				}
+				catch (Throwable ex)
+				{
+					CourierUtil.cleanCourier(_pickUpCourier);
+					
+					resetCourier(); // to be on the safe side!
+				}
+			
+				rollbackTransaction();
+			}
+			
+			TransactionStrategy.removeStrategy();
+		}
 
 		if (null != message)
 		{
-                    final Message pipelineMessage = message ;
-                    final Runnable pipelineRunner = new Runnable() {
-                        public void run() {
-                            try {
-                                pipeline.process(pipelineMessage) ;
-                            } finally {
-                                updateThreadCount(-1) ;
-                            }
-                        }
-                    } ;
-                    updateThreadCount(+1);
-                    _execService.execute(pipelineRunner);
+			try
+			{
+				final Message pipelineMessage = message ;
+				final Object txHandle = transactionStrategy.suspend();
+				final TransactionalRunner txRunner = new TransactionalRunner(_pickUpCourier, pipelineMessage, txHandle);
+				
+				updateThreadCount(+1);
+				_execService.execute(txRunner);
+				
+				if (transactional)
+				{
+					_pickUpCourier = null;  // runner will clean it up.
+					
+					resetCourier();  // we need another courier for the next msg.
+				}
+			}
+			catch (TransactionStrategyException ex)
+			{
+				_logger.warn("Caught transaction related exception: ", ex);
+				rollbackTransaction();
+			}
 		}
+		else
+		{
+			// nothing to do, so roll back the transaction before returning.
+			
+			rollbackTransaction();
+		}
 
 	} // ________________________________
 
@@ -392,7 +483,134 @@
                 }
             }
         }
+        
+        private void rollbackTransaction ()
+        {
+        	try
+        	{
+        		transactionStrategy.rollbackOnly();
+        		transactionStrategy.terminate();
+        	}
+        	catch (Throwable ex)
+        	{
+        		_logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
+        	}
+        }
+        
+        private void resetCourier ()
+        {
+        	TwoWayCourier pickUpCourier = null;
 
+            try
+            {
+                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
+                try
+                {
+                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
+                        "setPollLatency", new Class[] { Long.class });
+                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+                }
+                catch (final NoSuchMethodException nsme)
+                {
+                        // OK, just leave it null
+                }
+                catch (final Exception ex)
+                {
+                    CourierUtil.cleanCourier(pickUpCourier);
+                    
+                    _logger.error("Problems invoking setPollLatency(long)", ex);
+                }
+            }
+            catch (final MalformedEPRException mepre)
+            {
+                _logger.error("Malformed EPR: " + _epr) ;
+            }
+            catch (final CourierException ce)
+            {
+                _logger.error("No appropriate courier can be obtained for " + _epr, ce);
+            }
+
+            _pickUpCourier = pickUpCourier;
+        }
+
+        class TransactionalRunner implements Runnable
+        {
+        	public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
+        	{
+        		_courier = courier;
+        		_pipelineMessage = pipelineMessage;
+        		_txHandle = txHandle;
+        	}
+        	
+        	public void run()
+        	{
+        		boolean problem = false;
+        		
+        		try
+        		{
+        			transactionStrategy.resume(_txHandle);
+        			
+        			pipeline.setTransactional(transactional);
+        			
+        			TransactionStrategy.setStrategy(transactionStrategy);
+        			
+        			/*
+        			 * Current strategy is to commit as long as process returns true.
+        			 * If fails, or any exceptions are caught, then we roll back.
+        			 * 
+        			 * TODO re-examine the semantics around true/false from the pipeline.
+        			 */
+        			
+        			// TODO consider adding a RollbackOnFalse option to allow override.
+        			
+        			problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
+
+        			if (!problem)
+        			{
+        				transactionStrategy.terminate();
+        			}
+        		}
+        		catch (TransactionStrategyException ex)
+        		{
+        			problem = true;
+        			
+        			_logger.warn("TransactionalRunner caught transaction exception: ", ex);
+        		}
+        		catch (RuntimeException ex)
+        		{
+        			problem = true;
+        			
+        			throw ex;
+        		}
+        		catch (Throwable ex)
+        		{
+        			problem = true;
+        			
+        			_logger.warn("TransactionalRunner caught throwable: ",ex);
+        		}
+        		finally
+        		{
+        			if (problem)
+        			{
+        				rollbackTransaction();
+        			}
+        				
+        			if (transactional)
+        			{
+        				CourierUtil.cleanCourier(_courier);
+        			}
+        			
+        			TransactionStrategy.removeStrategy();
+        			
+        			updateThreadCount(-1);
+        		}
+        	}
+        	
+        	private PickUpOnlyCourier _courier;
+        	private Message _pipelineMessage;
+        	private Object _txHandle;
+        };
+        
         private ConfigTree _config;
 
         private String _eprCategoryName;

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -26,6 +26,8 @@
 import org.apache.log4j.Logger;
 import org.hibernate.Session;
 import org.hibernate.Transaction;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.monitoring.MonitoringSessionFactory;
 import org.jboss.soa.esb.monitoring.StatisticsBean;
 import org.jboss.soa.esb.monitoring.StatisticsData;
@@ -160,6 +162,21 @@
 	 * @param f_sb statistics bean
 	 */
 	public void insertStatistics(StatisticsBean f_sb) {
+		TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+		Object txHandle = null;
+		
+		if (txStrategy != null)
+		{
+			try
+			{
+				txHandle = txStrategy.suspend();
+			}
+			catch (TransactionStrategyException ex)
+			{
+				// if it failed, then the next work will fail too.
+			}
+		}
+		
 		Session sess = null;
 		Transaction tx = null;
 		try {
@@ -187,6 +204,18 @@
 			sess.flush();
 			sess.close();
 			sess = null;
+			
+			if (txHandle != null)
+			{
+				try
+				{
+					txStrategy.resume(txHandle);
+				}
+				catch (TransactionStrategyException ex)
+				{
+					logger.error("Problem resuming transaction!", ex);
+				}
+			}
 		}
 	}
 	

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -25,6 +25,8 @@
 import org.apache.log4j.Logger;
 import org.hibernate.Session;
 import org.hibernate.Transaction;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.monitoring.MonitoringSessionFactory;
 import org.jboss.soa.esb.monitoring.pojo.JMXOperation;
 import org.jboss.soa.esb.monitoring.pojo.JMXOperationResult;
@@ -65,6 +67,20 @@
 	 * @see org.jboss.soa.esb.monitoring.server.Filer#persistData()
 	 */
 	public void persistData() {
+		TransactionStrategy txS = TransactionStrategy.getStrategy();
+		Object txHandle = null;
+		
+		if (txS != null)
+		{
+			try
+			{
+				txHandle = txS.suspend();
+			}
+			catch (TransactionStrategyException ex)
+			{
+			}
+		}
+		
 		Session sess = null;
 		Transaction tx = null;
 		try {
@@ -86,6 +102,18 @@
 			sess.flush();
 			sess.close();
 			sess = null;
+			
+			if (txHandle != null)
+			{
+				try
+				{
+					txS.resume(txHandle);
+				}
+				catch (TransactionStrategyException ex)
+				{
+					logger.error("Problem resuming transaction!", ex);
+				}
+			}
 		}
 	}
 }

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java	2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java	2007-12-29 01:37:43 UTC (rev 17426)
@@ -28,6 +28,8 @@
 import org.hibernate.Query;
 import org.hibernate.Session;
 import org.hibernate.Transaction;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.message.body.content.ServiceControlCommand;
 import org.jboss.soa.esb.monitoring.MonitoringSessionFactory;
 import org.jboss.soa.esb.monitoring.OperationsData;
@@ -93,6 +95,20 @@
 	public void insertOperations(ServiceControlCommand f_ob) {
 		Session sess = null;
 		Transaction tx = null;
+		TransactionStrategy txS = TransactionStrategy.getStrategy();
+		Object txHandle = null;
+		
+		if (txS != null)
+		{
+			try
+			{
+				txHandle = txS.suspend();
+			}
+			catch (TransactionStrategyException ex)
+			{
+			}
+		}
+		
 		try {
 			sess = (Session) MonitoringSessionFactory.getInstance().openSession();
 			tx = sess.beginTransaction();
@@ -119,6 +135,18 @@
 			sess.flush();
 			sess.close();
 			sess = null;
+			
+			if (txHandle != null)
+			{
+				try
+				{
+					txS.resume(txHandle);
+				}
+				catch (TransactionStrategyException ex)
+				{
+					logger.error("Problem resuming transaction!", ex);
+				}
+			}
 		}
 	}
 	
@@ -130,6 +158,20 @@
 	public void updateActiveFlag(String serverName) {
 		Session sess = null;
 		Transaction tx = null;
+		TransactionStrategy txS = TransactionStrategy.getStrategy();
+		Object txHandle = null;
+		
+		if (txS != null)
+		{
+			try
+			{
+				txHandle = txS.suspend();
+			}
+			catch (TransactionStrategyException ex)
+			{
+			}
+		}
+		
 		try {
 			sess = (Session) MonitoringSessionFactory.getInstance().openSession();
 			tx = sess.beginTransaction();
@@ -151,6 +193,18 @@
 			sess.flush();
 			sess.close();
 			sess = null;
+			
+			if (txHandle != null)
+			{
+				try
+				{
+					txS.resume(txHandle);
+				}
+				catch (TransactionStrategyException ex)
+				{
+					logger.error("Problem resuming transaction!", ex);
+				}
+			}
 		}
 	}
 	




More information about the jboss-svn-commits mailing list