[jboss-svn-commits] JBL Code SVN: r19553 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta: src/org/jboss/internal/soa/esb/couriers/helpers and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Apr 14 13:51:27 EDT 2008


Author: tfennelly
Date: 2008-04-14 13:51:27 -0400 (Mon, 14 Apr 2008)
New Revision: 19553

Added:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java
Log:
SqlTableCourier cleanup: http://jira.jboss.com/jira/browse/JBESB-1626

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-04-14 16:45:02 UTC (rev 19552)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-04-14 17:51:27 UTC (rev 19553)
@@ -22,40 +22,40 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
-
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.helpers.JDBCEprDBResourceFactory;
 import org.jboss.soa.esb.addressing.Call;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.common.TransactionStrategy;
 import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.CourierTransportException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierServiceBindException;
-import org.jboss.soa.esb.couriers.CourierMarshalUnmarshalException;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
-import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.couriers.*;
 import org.jboss.soa.esb.listeners.message.errors.Factory;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.util.Type;
 import org.jboss.soa.esb.util.Util;
-import org.xml.sax.SAXParseException;
 
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
 public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
 {
+    protected long _pollLatency = 200;
+
+    protected long _sleepForRetries = 3000; // milliseconds
+
+    protected boolean deleteOnSuccess, deleteOnError;
+	protected boolean _isReceiver;
+
+    private JDBCEprDBResourceFactory jdbcFactory;
+
+	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+
 	/**
 	 * package protected constructor - Objects of Courier should only be
 	 * instantiated by the Factory
@@ -65,7 +65,7 @@
 	SqlTableCourier(JDBCEpr epr) throws CourierException
 	{
 		this(epr, false);
-	}
+    }
 
 	/**
 	 * package protected constructor - Objects of Courier should only be
@@ -76,13 +76,12 @@
 	SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
 	{
 		_isReceiver = isReceiver;
-		_epr = epr;
 		_sleepForRetries = 3000;  // TODO magic number - configurable?
 		try
 		{
-			_postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+			deleteOnSuccess = Boolean.TRUE.equals(Boolean.valueOf(epr
 					.getPostDelete()));
-			_errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+			deleteOnError = Boolean.TRUE.equals(Boolean.valueOf(epr
 					.getErrorDelete()));
 		}
 		catch (URISyntaxException e)
@@ -90,25 +89,12 @@
 			throw new CourierException(e);
 		}
 
-	} // ________________________________
+        jdbcFactory = new JDBCEprDBResourceFactory(epr);
+	}
 
-	public void cleanup()
-	{
-		if (null != _conn)
-		{
-			try
-			{
-				_conn.release();
-			}
-			catch (Exception e)
-			{
-				e.printStackTrace();
-				_logger.info("Unable to release connection", e);
-			}
-		}
+	public void cleanup() {
+	}
 
-	} // ________________________________
-
 	/**
 	 * package the ESB message in a java.io.Serializable, and write it.
 	 * Delivery occurs within its own transaction if there is no
@@ -129,7 +115,7 @@
 		if (null == message)
 			return false;
 
-		String msgId = null;
+		String msgId;
 		Call call = message.getHeader().getCall();
 		if (null==call)
 			message.getHeader().setCall(call=new Call());
@@ -144,572 +130,303 @@
 			throw new CourierException("Problems with message header ",e);
 		}
 
-		try
+        boolean transactional;
+        try
 		{
-        		TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
-        		Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
-        		boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
-        		
-        		transactional = (txHandle != null);
-        		
-        		/*
-        		 * Make sure the current transaction is still active! If we
-        		 * have previously slept, then the timeout may be longer than that
-        		 * associated with the transaction.
-        		 */
-        		
-        		if (transactional && !isActive)
-        		{
-        			throw new CourierException("Associated transaction is no longer active!");
-        		}
+            TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+            Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+            boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+
+            transactional = (txHandle != null);
+
+            /*
+             * Make sure the current transaction is still active! If we
+             * have previously slept, then the timeout may be longer than that
+             * associated with the transaction.
+             */
+
+            if (transactional && !isActive)
+            {
+                throw new CourierException("Associated transaction is no longer active!");
+            }
 		}
 		catch (TransactionStrategyException ex)
 		{
 		    throw new CourierException(ex);
 		}
-		
-		if (null == _conn)
-		{
-			try
-			{
-				_conn = getConn();
-			}
-			catch (final Exception e)
-			{
-				throw new CourierServiceBindException(e);
-			}
-		}
 
-		while (_conn != null)
-		{
-			try
-			{
-				int iCol = 1;
-				PreparedStatement PS = insertStatement();
-				PS.setString(iCol++, msgId);
-				PS.setObject(iCol++, Util.serialize(message));
-				PS.setString(iCol++, State.Pending.getColumnValue());
-				PS.setLong(iCol++, System.currentTimeMillis());
+        Serializable serilaizedMessage;
+        try {
+            serilaizedMessage = Util.serialize(message);
+        } catch (Exception e) {
+            throw new CourierTransportException("Unable to serialize ESB Message.", e);
+        }
 
-				_conn.execUpdWait(PS, 3);
-				
-				if (!transactional)
-				    _conn.commit();
-				
-				return true;
-			}
-			catch (SQLException e)
-			{
-				if (null != _conn)
-				{
-					try
-					{
-					    if (!transactional)
-						_conn.rollback();
-					}
-					catch (Exception roll)
-					{
-						_logger.debug(roll);
-					}
-				}
-				
-				_logger.debug("SQL exception during deliver", e);
-				throw new CourierTransportException(e);
-			}
-			catch (Exception e)
-			{
-				if (!jdbcConnectRetry(e))
-				    throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
-			}
-		}
-		return false;
-	} // ________________________________
+        Connection connection = jdbcFactory.createConnection(transactional);
+        try
+        {
+            PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
+            try {
+                insertStatement.setString(1, msgId);
+                insertStatement.setObject(2, serilaizedMessage);
+                insertStatement.setString(3, State.Pending.getColumnValue());
+                insertStatement.setLong(4, System.currentTimeMillis());
 
-	public Message pickup(long millis) throws CourierException, CourierTimeoutException
+                insertStatement.executeUpdate();
+            } finally {
+                insertStatement.close();
+            }
+
+            if (!transactional) {
+                connection.commit();
+            }
+
+            return true;
+        }
+        catch (SQLException e)
+        {
+            try
+            {
+                if (!transactional) {
+                    connection.rollback();
+                }
+            }
+            catch (Exception roll)
+            {
+                _logger.debug(roll);
+            }
+
+            _logger.debug("SQL exception during deliver", e);
+            throw new CourierTransportException(e);
+        } finally {
+            try {
+                if (!transactional) {
+                    connection.close();
+                }
+            } catch (SQLException e) {
+                _logger.error("Exception while closing DataSource connection.", e);
+            }
+        }
+	}
+
+    public Message pickup(long millis) throws CourierException, CourierTimeoutException
 	{
 		Message result = null;
 		long limit = System.currentTimeMillis()
 				+ ((millis < 100) ? 100 : millis);
-		
+
 		do
 		{
-			try
+            boolean transactional;
+            try
 			{
 				TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
 				Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
 				boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
-				
+
 				transactional = (txHandle != null);
-				
+
 				/*
 				 * Make sure the current transaction is still active! If we
 				 * have previously slept, then the timeout may be longer than that
 				 * associated with the transaction.
 				 */
-				
+
 				/*
 				 * MessageAwareListener will catch exceptions and roll back the transaction.
 				 */
-				
+
 				if (transactional && !isActive)
 				{
 				    _logger.error("SqlTableCourier - associated transaction is no longer active!");
-				   
+
 				    throw new CourierException("Associated transaction is no longer active!");
 				}
 			}
 			catch (TransactionStrategyException ex)
 			{
 				_logger.error("Could not determine transaction association!", ex);
-				
+
 				throw new CourierException("Could not determine transaction association!");
 			}
-			
-			ResultSet RS = null;
-			
-			try
-			{
-			    RS = getRowList();
 
-			    while (null != RS && RS.next())
-			    {
-				String messageId = RS.getString(1);
+            Connection connection = jdbcFactory.createConnection(transactional);
+            try {
+                PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
+                try {
+                    ResultSet resultSet = listStatement.executeQuery();
+                    try {
+                        while (resultSet.next()) {
+                            String messageId = resultSet.getString(1);
 
-				if (null == (result = tryToPickup(messageId)))
-				    continue;
+                            result = tryToPickup(messageId, connection);
 
-				/*
-				 * If this is fault message, then throw an exception with the contents. With the
-				 * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
-				 */
+                            // We've successfully picked up a message, so we can commit on a
+                            // non-transacted connection...
+                            if (!transactional) {
+                                connection.commit();
+                            }
 
-				if (Type.isFaultMessage(result))
-				    Factory.createExceptionFromFault(result);
+                            if (result != null) {
+                                /*
+                                 * If this is fault message, then throw an exception with the contents. With the
+                                 * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
+                                 */
+                                if (Type.isFaultMessage(result)) {
+                                    Factory.createExceptionFromFault(result);
+                                } else {
+                                    return result;
+                                }
+                            }
+                        }
+                    } finally {
+                        try {
+                            resultSet.close();
+                        } catch (Exception e) {
+                            _logger.warn("SQL Exception closing ResultSet", e);
+                        }
+                    }
+                } finally {
+                    try {
+                        listStatement.close();
+                    } catch (Exception e) {
+                        _logger.warn("SQL Exception closing PreparedStatement", e);
+                    }
+                }
+            } catch (FaultMessageException e) {
+                // The picked up message was a fault, generating this exception
+                // in Factory.createExceptionFromFault.  Just rethrow...
+                throw e;
+            } catch (Exception e) {
+                _logger.warn("Exception during pickup", e);
+                if (!transactional) {
+                    try {
+                        connection.rollback();
+                    } catch (SQLException e1) {
+                        _logger.warn("SQL Exception during rollback", e);
+                    }
+                }
+                throw new CourierTransportException(e);
+            } finally {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    _logger.warn("Error closing DataSource Connection.", e);
+                }
+            }
 
-				return result;
-			    }
-			}
-			catch (SQLException e)
-			{
-			    _logger.warn("SQL Exception during pickup", e);
-			    
-			    throw new CourierTransportException(e);
-			}
-			finally
-			{
-			    try
-			    {
-        			    if (RS != null)
-        				RS.close();
-			    }
-			    catch (final SQLException ex)
-			    {
-				_logger.warn("SQLException during close of ResultSet.", ex);
-			    }
-			    
-			    // Added to make sure we release transactions from all paths
-			    if (_conn != null)
-			    {
-				try
-				{
-				    if (!transactional)
-					_conn.rollback() ;
-				}
-				catch (final SQLException sqle) {} //ignore
-			    }
-			}
-			
-			try
-			{
-			    long lSleep = limit - System.currentTimeMillis();
-			    if (_pollLatency < lSleep)
-				lSleep = _pollLatency;
-			    if (lSleep > 0)
-				Thread.sleep(lSleep);
-			}
-			catch (InterruptedException e)
-			{
-			    return null;
-			}
-		} while (System.currentTimeMillis() <= limit);
-		return null;
-	} // ________________________________
+            try {
+                long lSleep = limit - System.currentTimeMillis();
+                if (_pollLatency < lSleep)
+                    lSleep = _pollLatency;
+                if (lSleep > 0)
+                    Thread.sleep(lSleep);
+            }
+            catch (InterruptedException e) {
+                return null;
+            }
+        } while (System.currentTimeMillis() <= limit);
 
-	private Message tryToPickup(String messageId) throws CourierException,
-			SQLException
-	{
-		int iParm = 1;
+        return null;
+    }
 
-		select4UpdateStatement().setString(iParm++, messageId);
-		select4UpdateStatement().setString(iParm++,
-				State.Pending.getColumnValue());
-
-		while (_conn != null)
-		{
-		    ResultSet RS = null;
-		    
-			try
-			{
-				RS = _conn.execQueryWait(select4UpdateStatement(), 3);
-				
-				while (RS.next())
-				{
-					Exception eBad = null;
-					try
-					{
-						Message result = Util.deserialize((Serializable) RS
-								.getObject(1));
-						if (_postDelete)
-							deleteMsg(messageId);
-						else
-							changeStatus(messageId, State.Done);
-						return result;
-					}
-					catch (ClassCastException e)
-					{
-						eBad = e;
-					}
-					catch (SAXParseException e)
-					{
-						eBad = e;
-					}
-					catch (final IOException e)
-					{
-					    throw new CourierMarshalUnmarshalException(e);
-					}
-					catch (Exception e)
-					{
-						throw new CourierException(e);
-					}
-					if (null != eBad)
-					{
-						if (_errorDelete)
-							deleteMsg(messageId);
-						else
-							changeStatus(messageId, State.Error);
-						continue;
-					}
-				}
-				return null;
-			}
-			catch (SQLException e)
-			{
-				throw new CourierTransportException(e);
-			}
-			catch (Exception ex)
-			{
-			    // bail-out now if we can't reconnect, rather than lose the error in the next sweep.
-			    
-				if (!jdbcConnectRetry(ex))
-				    throw new CourierTransportException("Caught unexpected exception during SQL receive and could not reconnect!", ex);
-			}
-			finally
-			{
-			    try
-			    {
-				if (RS != null)
-				    RS.close();
-			    }
-			    catch (final Exception ex)
-			    {
-				_logger.warn("Could not close ResultSet.", ex);
-			    }
-			}
-		}
-		return null;
-	} // ________________________________
-
-	private void deleteMsg(String messageId) throws SQLException
+    private Message tryToPickup(String messageId, Connection connection) throws CourierException, SQLException
 	{
-		int iParm = 1;
-		deleteStatement().setString(iParm++, messageId);
-		_conn.execUpdWait(deleteStatement(), 3);
-		
-		if (!transactional)
-			_conn.commit();
-	}
+        PreparedStatement selectUpdateStatement = jdbcFactory.createSelect4UpdateStatement(connection);
 
-	private void changeStatus(String messageId, State to) throws SQLException
-	{
-		int iParm = 1;
-		updateStatusStatement().setString(iParm++, to.getColumnValue());
-		updateStatusStatement().setString(iParm++, messageId);
-		_conn.execUpdWait(updateStatusStatement(), 3);
-		
-		if (!transactional)
-			_conn.commit();
-	}
+        try {
+            selectUpdateStatement.setString(1, messageId);
+            selectUpdateStatement.setString(2, State.Pending.getColumnValue());
 
-	private ResultSet getRowList() throws CourierException
-	{
-		if (null == _conn)
-		{
-			try
-			{
-				_conn = getConn();
-			}
-			catch (final Exception e)
-			{
-				throw new CourierServiceBindException(e);
-			}
-		}
-		while (_conn != null)
-		{
-			try
-			{
-				return _conn.execQueryWait(listStatement(), 3);
-			}
-			catch (Exception e)
-			{
-				_logger.debug("Problem encountered while executing query.", e);
-				e.printStackTrace();
-				
-				jdbcConnectRetry(e);
-			}
-		}
-		return null;
+            ResultSet resultSet = selectUpdateStatement.executeQuery();
+            try
+            {
+                if (resultSet.next())
+                {
+                    Message result = null;
 
-	} // _______________________________
+                    try
+                    {
+                        Serializable blob = (Serializable) resultSet.getObject(1);
+                        result = Util.deserialize(blob);
+                    }
+                    catch (Exception e)
+                    {
+                        // If there's an error deserializing the message blob, we either
+                        // delete the message (deleteOnError), or change it's state
+                        // to "State.Error" i.e. no exceptions/rollbacks...
+                        result = null;
+                    } finally {
+                        if (result == null && deleteOnError) {
+                            deleteMsg(messageId, connection);
+                        } else if (result != null && deleteOnSuccess) {
+                            deleteMsg(messageId, connection);
+                        } else if(result == null) {
+                            changeStatus(messageId, State.Error, connection);
+                        } else {
+                            changeStatus(messageId, State.Done, connection);
+                        }
+                    }
 
-	private boolean jdbcConnectRetry(Exception exc)
-	{
-		_logger.debug("DB problem, will try to reconnect", exc);
-		
-		cleanup();
-		_conn = null;
+                    return result;
+                }
+            }
+            finally
+            {
+                try
+                {
+                    resultSet.close();
+                } catch (final Exception ex) {
+                    _logger.warn("Could not close ResultSet.", ex);
+                }
+            }
+        } finally {
+            selectUpdateStatement.close();
+        }
 
-		_prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
-		for (int i1 = 0; i1 < 3; i1++)
-		{
-			try
-			{
-				_conn = getConn();
-			}
-			catch (Exception e)
-			{
-				try
-				{
-					Thread.sleep(_sleepForRetries);
-				}
-				catch (InterruptedException eInt)
-				{
-					return false;
-				}
-			}
-		}
+        return null;
+	}
 
-		return !(_conn == null);
-	} // ________________________________
-
-	private JdbcCleanConn getConn() throws SQLException, MalformedEPRException, NamingException
+    private void deleteMsg(String messageId, Connection connection) throws SQLException
 	{
-		if (null == _conn)
-		{
-			try
-			{
-				DataSource DS = null;
-				if (_epr.getDatasource() == null) {
-					DS = new SimpleDataSource(_epr.getDriver(), 
-						_epr.getURL(), _epr.getUserName(), _epr.getPassword());
-				} else {
-					InitialContext initContext;
-					try {
-						initContext = new InitialContext();
-						DS = (DataSource) initContext.lookup(_epr.getDatasource());
-					} catch (NamingException e) {
-						_logger.error("Problem resolving DataSource through JNDI", e);
-						
-						throw e; // it'll get wrapped later anyway!
-					}
-				}
-				_conn = new JdbcCleanConn(DS, transactional);
-			}
-			catch (URISyntaxException ex)
-			{
-				throw new MalformedEPRException(ex);
-			}
-		}
-		return _conn;
-	} // ________________________________
+        PreparedStatement statement = jdbcFactory.createDeleteStatement(connection);
 
-	protected PreparedStatement listStatement() throws SQLException
-	{
-		if (null == _prepGetList)
-		{
-			try
-			{
-				String[] columns =
-				{ _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
+        try {
+            statement.setString(1, messageId);
+            statement.executeUpdate();
+        }   finally {
+            statement.close();
+        }
+    }
 
-				StringBuilder sb = new StringBuilder("select");
-				int i1 = 0;
-				for (String col : columns)
-					sb.append((i1++ < 1) ? " " : ",").append(col);
-				sb.append(" from ").append(_epr.getTableName());
-				sb.append(" where ").append(_epr.getStatusColumn())
-						.append("='").append(State.Pending.getColumnValue())
-						.append("'").append(" order by 2");
-				_prepGetList = getConn().prepareStatement(sb.toString());
-			}
-			catch (SQLException ex)
-			{
-				throw ex;
-			}
-			catch (Exception e)
-			{
-				e.printStackTrace();
-				_logger.debug("Unable to prepare SQL statement", e);
-				throw new SQLException("Problem encountered when trying to created PreparedStatement: "+e);
-			}
-		}
-		
-		return _prepGetList;
-	} // ________________________________
-
-	protected PreparedStatement select4UpdateStatement()
+    private void changeStatus(String messageId, State to, Connection connection) throws SQLException
 	{
-		if (_prepSel4Upd == null)
-		{
-			try
-			{
-				/*
-				 * TODO make this dynamic using a factory pattern.
-				 */
+        PreparedStatement statement = jdbcFactory.createUpdateStatusStatement(connection);
 
-				StringBuilder sb = null;
+        try {
+            statement.setString(1, to.getColumnValue());
+            statement.setString(2, messageId);
+            statement.executeUpdate();
+        } finally {
+            statement.close();
+        }
+    }
 
-				if (!_epr.getURL().contains("hsqldb"))
-				{
-					sb = new StringBuilder("select ").append(
-							_epr.getDataColumn()).append(" from ").append(
-							_epr.getTableName()).append(" where ").append(
-							_epr.getMessageIdColumn()).append("=?").append(
-							" and ").append(_epr.getStatusColumn())
-							.append("=?").append(" for update");
-				}
-				else
-				{
-					/*
-					 * HSQL does not support FOR UPDATE! All tables appear to
-					 * be inherently updatable!
-					 */
-					
-					sb = new StringBuilder("select ").append(
-							_epr.getDataColumn()).append(" from ").append(
-							_epr.getTableName()).append(" where ").append(
-							_epr.getMessageIdColumn()).append("=?").append(
-							" and ").append(_epr.getStatusColumn())
-							.append("=?");
-				}
-
-				_prepSel4Upd = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		}
-
-		return _prepSel4Upd;
-	} // ________________________________
-
-	protected PreparedStatement updateStatusStatement()
+    public static enum State
 	{
-		if (null == _prepUpdateStatus)
-			try
-			{
-				StringBuilder sb = new StringBuilder("update ").append(
-						_epr.getTableName()).append(" set ").append(
-						_epr.getStatusColumn()).append("= ?").append(" where ")
-						.append(_epr.getMessageIdColumn()).append("=?");
-				_prepUpdateStatus = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		return _prepUpdateStatus;
-	} // ________________________________
-
-	protected PreparedStatement insertStatement()
-	{
-		if (null == _prepInsert)
-			try
-			{
-				String[] columns =
-				{ _epr.getMessageIdColumn(), _epr.getDataColumn(),
-						_epr.getStatusColumn(), _epr.getTimestampColumn() };
-
-				StringBuilder sb = new StringBuilder("insert into ").append(
-						_epr.getTableName()).append("(");
-				int i1 = 0;
-				for (String col : columns)
-					sb.append((i1++ < 1) ? " " : ",").append(col);
-				sb.append(") values (?,?,?,?)");
-				_prepInsert = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		return _prepInsert;
-	} // ________________________________
-
-	protected PreparedStatement deleteStatement()
-	{
-		if (null == _prepDelete)
-			try
-			{
-				StringBuilder sb = new StringBuilder("delete from ").append(
-						_epr.getTableName()).append(" where ").append(
-						_epr.getMessageIdColumn()).append(" =?");
-				_prepDelete = getConn().prepareStatement(sb.toString());
-			}
-			catch (Exception e)
-			{
-				_logger.debug(e);
-				return null;
-			}
-		return _prepDelete;
-	} // ________________________________
-
-	protected enum State
-	{
 		Pending, WorkInProgress, Done, Error;
-		String getColumnValue()
+
+        public String getColumnValue()
 		{
 			return toString().substring(0, 1);
 		}
-	}
+    }
 
-	public void setPollLatency(Long millis)
+    public void setPollLatency(Long millis)
 	{
 		if (millis <= 200)
 			_logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of "+_pollLatency);
 		else
 			_pollLatency = millis;
-	} // ________________________________
-	
-	protected long _pollLatency = 200;
-	protected long _sleepForRetries = 3000; // milliseconds
-
-	protected boolean _postDelete, _errorDelete;
-	protected boolean _isReceiver;
-
-	protected JDBCEpr _epr;
-
-	protected JdbcCleanConn _conn;
-
-	protected PreparedStatement _prepGetList;
-	protected PreparedStatement _prepSel4Upd;
-	protected PreparedStatement _prepUpdateStatus;
-	protected PreparedStatement _prepInsert;
-	protected PreparedStatement _prepDelete;
-	
-	private boolean transactional = false;
-
-	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+	}
 }
\ No newline at end of file

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java	2008-04-14 17:51:27 UTC (rev 19553)
@@ -0,0 +1,253 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers.helpers;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.SqlTableCourier;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.CourierServiceBindException;
+import org.jboss.soa.esb.couriers.CourierTransportException;
+import org.jboss.soa.esb.util.ClassUtil;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.sql.DataSource;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.DriverManager;
+
+/**
+ * Factory for creating JDBCEpr based database resources for the SQLTableCourier..
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JDBCEprDBResourceFactory {
+
+    private static Logger logger = Logger.getLogger(JDBCEprDBResourceFactory.class);
+
+    private JDBCEpr epr;
+
+    private DataSource dataSource;
+    private String insertStatementSQL;
+    private String listStatementSQL;
+    private String select4UpdateStatementSQL;
+    private String updateStatusStatementSQL;
+    private String deleteStatementSQL;
+
+    public JDBCEprDBResourceFactory(JDBCEpr epr) throws CourierServiceBindException {
+        this.epr = epr;
+
+        if (epr.getDatasource() != null) {
+            lookupDataSource(epr);
+        } else {
+            try {
+                try {
+                    ClassUtil.forName(epr.getDriver(), getClass());
+                } catch (ClassNotFoundException e) {
+                    throw new CourierServiceBindException("Database driver '" + epr.getDriver() + "' not available on classpath.");
+                }
+            } catch (URISyntaxException e) {
+                throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+            }
+        }
+    }
+
+
+    public Connection createConnection(boolean transactional) throws CourierTransportException, CourierServiceBindException {
+        Connection connection;
+
+        try {
+            if(dataSource != null) {
+                connection = dataSource.getConnection();
+            } else {
+                try {
+                    connection = DriverManager.getConnection(epr.getURL(), epr.getUserName(), epr.getPassword());
+                } catch (URISyntaxException e) {
+                    throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+                }
+            }
+        } catch (SQLException e) {
+            throw new CourierTransportException("Failed to connect to DataSource.", e);
+        }
+
+        if (!transactional) {
+            try {
+                connection.setAutoCommit(false);
+            } catch (SQLException e) {
+                try {
+                    connection.close();
+                } catch (SQLException e1) {
+                    logger.error("Failed to close connection.", e1);
+                }
+                throw new CourierTransportException("Failed to turn off autoCommit on connection..", e);
+            }
+        }
+
+        return connection;
+    }
+
+    public PreparedStatement createListStatement(Connection connection) throws SQLException {
+        if (listStatementSQL == null) {
+            listStatementSQL = buildListStatementSQL();
+        }
+        return connection.prepareStatement(listStatementSQL);
+    }
+
+    public PreparedStatement createSelect4UpdateStatement(Connection connection) throws SQLException {
+        if (select4UpdateStatementSQL == null) {
+            select4UpdateStatementSQL = buildSelect4UpdateStatementSQL();
+        }
+        return connection.prepareStatement(select4UpdateStatementSQL);
+    }
+
+    public PreparedStatement createUpdateStatusStatement(Connection connection) throws SQLException {
+        if (updateStatusStatementSQL == null) {
+            updateStatusStatementSQL = buildUpdateStatusStatementSQL();
+        }
+        return connection.prepareStatement(updateStatusStatementSQL);
+    }
+
+    public PreparedStatement createInsertStatement(Connection connection) throws SQLException {
+        if (insertStatementSQL == null) {
+            insertStatementSQL = buildInsertStatementSQL();
+        }
+        return connection.prepareStatement(insertStatementSQL);
+    }
+
+    public PreparedStatement createDeleteStatement(Connection connection) throws SQLException {
+        if (deleteStatementSQL == null) {
+            deleteStatementSQL = buildDeleteStatementSQL();
+        }
+        return connection.prepareStatement(deleteStatementSQL);
+    }
+
+    private String buildSelect4UpdateStatementSQL() {
+        StringBuilder sb = new StringBuilder("select ");
+
+        try {
+            if (!epr.getURL().contains("hsqldb")) {
+                sb = sb.append(
+                        epr.getDataColumn()).append(" from ").append(
+                        epr.getTableName()).append(" where ").append(
+                        epr.getMessageIdColumn()).append("=?").append(
+                        " and ").append(epr.getStatusColumn())
+                        .append("=?").append(" for update");
+            } else {
+                /*
+                 * HSQL does not support FOR UPDATE! All tables appear to
+                 * be inherently updatable!
+                 */
+                sb = sb.append(
+                        epr.getDataColumn()).append(" from ").append(
+                        epr.getTableName()).append(" where ").append(
+                        epr.getMessageIdColumn()).append("=?").append(
+                        " and ").append(epr.getStatusColumn())
+                        .append("=?");
+            }
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+
+        return sb.toString();
+    }
+
+    private String buildUpdateStatusStatementSQL() {
+        try {
+            StringBuilder sb = new StringBuilder("update ").append(
+                    epr.getTableName()).append(" set ").append(
+                    epr.getStatusColumn()).append("= ?").append(" where ")
+                    .append(epr.getMessageIdColumn()).append("=?");
+
+            return sb.toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+    }
+
+    private String buildInsertStatementSQL() {
+        try {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("insert into ").append(epr.getTableName());
+            sb.append(" (");
+            sb.append(epr.getMessageIdColumn()).append(", ");
+            sb.append(epr.getDataColumn()).append(", ");
+            sb.append(epr.getStatusColumn()).append(", ");
+            sb.append(epr.getTimestampColumn());
+            sb.append(") values (?,?,?,?)");
+
+            return sb.toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+    }
+
+    private String buildListStatementSQL() {
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("select ");
+            sb.append(epr.getMessageIdColumn()).append(", ");
+            sb.append(epr.getTimestampColumn());
+            sb.append(" from ").append(epr.getTableName());
+            sb.append(" where ").append(epr.getStatusColumn());
+            sb.append(" = '").append(SqlTableCourier.State.Pending.getColumnValue()).append("'");
+            sb.append(" order by 2");
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+
+        return sb.toString();
+    }
+
+    private String buildDeleteStatementSQL() {
+        try {
+            StringBuilder sb = new StringBuilder("delete from ").append(
+                    epr.getTableName()).append(" where ").append(
+                    epr.getMessageIdColumn()).append(" =?");
+
+            return sb.toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unexpected URISyntaxException from EPR getter method.", e);
+        }
+    }
+
+    private void lookupDataSource(JDBCEpr epr) throws CourierServiceBindException {
+        try
+        {
+            if (epr.getDatasource() != null) {
+                InitialContext initContext;
+                try {
+                    initContext = new InitialContext();
+                    dataSource = (DataSource) initContext.lookup(epr.getDatasource());
+                } catch (NamingException e) {
+                    logger.error("Problem resolving DataSource through JNDI", e);
+                    throw e; // it'll get wrapped later anyway!
+                }
+            }
+        }
+        catch (final Exception e)
+        {
+            throw new CourierServiceBindException("Failed to lookup DataSource.", e);
+        }
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2008-04-14 16:45:02 UTC (rev 19552)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2008-04-14 17:51:27 UTC (rev 19553)
@@ -235,6 +235,15 @@
 		throw eRet;
 	} // __________________________________
 
+
+    public Connection createConnection() throws SQLException {
+        if (m_oDS == null) {
+            throw new SQLException("DataSource is null!");
+        }
+
+        return m_oDS.getConnection();
+    }
+
 	private void connect() throws SQLException
 	{
 		if (m_conn != null)

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java	2008-04-14 16:45:02 UTC (rev 19552)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP2/product/rosetta/tests/src/org/jboss/soa/esb/addressing/eprs/tests/DefaultJdbcReplyToEprUnitTest.java	2008-04-14 17:51:27 UTC (rev 19553)
@@ -33,8 +33,11 @@
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.addressing.util.DefaultReplyTo;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
 import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
 import org.jboss.soa.esb.message.Message;
@@ -118,52 +121,43 @@
 
     
   @Test
-    public void testJdbcReplyEpr()
-    {
+    public void testJdbcReplyEpr() throws MalformedEPRException, CourierException, URISyntaxException, CourierTimeoutException {
   		_logger.info("_________________________________________");
     	_logger.info("testJdbcReplyEpr() invoked");
-        try
-        {
-        	//  Send a Message that will be picked up by a listener, and specify replyTo
-        	JDBCEpr toEpr = getEpr("foo");
-        	JDBCEpr replyToEpr = (JDBCEpr)DefaultReplyTo.getReplyTo(toEpr);
+        //  Send a Message that will be picked up by a listener, and specify replyTo
+        JDBCEpr toEpr = getEpr("foo");
+        JDBCEpr replyToEpr = (JDBCEpr)DefaultReplyTo.getReplyTo(toEpr);
 
-        	String text_1 = "Outgoing";
-        	Message outgoingMsg = MessageFactory.getInstance().getMessage();
-        	outgoingMsg.getHeader().getCall().setTo(toEpr);
-        	outgoingMsg.getHeader().getCall().setReplyTo(replyToEpr);
-        	outgoingMsg.getBody().add(text_1.getBytes());
-        	CourierUtil.deliverMessage(outgoingMsg);
+        String text_1 = "Outgoing";
+        Message outgoingMsg = MessageFactory.getInstance().getMessage();
+        outgoingMsg.getHeader().getCall().setTo(toEpr);
+        outgoingMsg.getHeader().getCall().setReplyTo(replyToEpr);
+        outgoingMsg.getBody().add(text_1.getBytes());
+        CourierUtil.deliverMessage(outgoingMsg);
 
-        	// Mock a service that picks up the original message and replies
-        	JDBCEpr serviceEpr = getEpr("foo");
-        	PickUpOnlyCourier listener = CourierFactory.getPickupCourier(serviceEpr);
-        	Message received = listener.pickup(100);
-        	String text_2 = new String((byte[]) received.getBody().get());
-        	assertTrue(text_1.equals(text_2));
+        // Mock a service that picks up the original message and replies
+        JDBCEpr serviceEpr = getEpr("foo");
+        PickUpOnlyCourier listener = CourierFactory.getPickupCourier(serviceEpr);
+        Message received = listener.pickup(100);
+        String text_2 = new String((byte[]) received.getBody().get());
+        assertTrue(text_1.equals(text_2));
 //        	assertTrue(replyToEpr.equals(received.getHeader().getCall().getReplyTo()));
-        	
-        	// now respond to replyTo
-        	text_2	+= " + processed by listener";
-        	Message response = MessageFactory.getInstance().getMessage();
-        	response.getHeader().getCall().setTo(received.getHeader().getCall().getReplyTo());
-        	response.getBody().add(text_2.getBytes());
-        	CourierUtil.deliverMessage(response);
-        	
-        	// try to pick up reply
-        	PickUpOnlyCourier waiter = CourierFactory.getPickupCourier(replyToEpr);
-        	Message finalMsg = waiter.pickup(100);
-        	assertTrue(text_2.equals(new String((byte[]) finalMsg.getBody().get())));
-        	
-        	_logger.info(text_2+"... and back from jdbc ReplyTo EPR");
-        	_logger.info("getDefaultReplyToEpr test succeeded for JDBC message transport");
 
-        }
-        catch (Exception e)
-        {
-  			_logger.error(e);
-            assertTrue(false);
-        }
+        // now respond to replyTo
+        text_2	+= " + processed by listener";
+        Message response = MessageFactory.getInstance().getMessage();
+        response.getHeader().getCall().setTo(received.getHeader().getCall().getReplyTo());
+        response.getBody().add(text_2.getBytes());
+        CourierUtil.deliverMessage(response);
+
+        // try to pick up reply
+        PickUpOnlyCourier waiter = CourierFactory.getPickupCourier(replyToEpr);
+        Message finalMsg = waiter.pickup(100);
+        assertTrue(text_2.equals(new String((byte[]) finalMsg.getBody().get())));
+
+        _logger.info(text_2+"... and back from jdbc ReplyTo EPR");
+        _logger.info("getDefaultReplyToEpr test succeeded for JDBC message transport");
+
     }
 	private static void dropTable(String tableName) throws Exception
 	{




More information about the jboss-svn-commits mailing list