[jboss-svn-commits] JBL Code SVN: r29656 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/couriers and 5 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Oct 19 07:43:30 EDT 2009


Author: tfennelly
Date: 2009-10-19 07:43:29 -0400 (Mon, 19 Oct 2009)
New Revision: 29656

Modified:
   labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java
Log:
https://jira.jboss.org/jira/browse/JBESB-1810
There is no retry limit when using transacted SQL listener

Modified: labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd
===================================================================
--- labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd	2009-10-19 11:43:29 UTC (rev 29656)
@@ -1447,6 +1447,13 @@
 					</xsd:documentation>
 				</xsd:annotation>
 			</xsd:attribute>
+            <xsd:attribute name="retry-count-column" type="xsd:string" use="optional">
+                <xsd:annotation>
+                    <xsd:documentation xml:lang="en">
+                        Column name for storing the message delivery retry count.
+                    </xsd:documentation>
+                </xsd:annotation>
+            </xsd:attribute>
 			<xsd:attribute name="message-column"
 				type="xsd:string" use="required">
 				<xsd:annotation>

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2009-10-19 11:43:29 UTC (rev 29656)
@@ -27,46 +27,60 @@
 import org.jboss.internal.soa.esb.util.StreamUtils;
 import org.jboss.soa.esb.addressing.Call;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.couriers.CourierTransportException;
-import org.jboss.soa.esb.couriers.FaultMessageException;
+import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.couriers.*;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.util.Util;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Types;
+import java.sql.*;
 import java.util.UUID;
 
 public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
 {
+    public static final String SQL_RETRY_COUNT = "org.jboss.soa.esb.sql.retry.count";
+
     protected long _pollLatency = 200;
 
     protected long _sleepForRetries = 3000; // milliseconds
 
     protected boolean deleteOnSuccess, deleteOnError;
-	protected boolean _isReceiver;
-	
-	private int messageType = Types.OTHER ;
+    protected boolean _isReceiver;
 
+    private boolean monitoringRetryCount;
+
+    private int messageType = Types.OTHER ;
+
     private JDBCEprDBResourceFactory jdbcFactory;
+    protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
 
-	protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+    /**
+     * Dead letter channel ServiceInvoker. Messages are delivered to the DLQ after the retry limit for a
+     * failed message has been exceeded.
+     */
+    private static ServiceInvoker dlQueueInvoker;
 
-	/**
+    /**
+     * Redelivery retry limit.
+     */
+    private static int retryLimit;
+
+    static {
+        String retryLimitConfig = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).getProperty(Environment.SQL_RETRY_LIMIT, "5").trim();
+        try {
+            retryLimit = Integer.parseInt(retryLimitConfig);
+        } catch (NumberFormatException e) {
+            retryLimit = 5;
+        }
+    }
+
+    /**
 	 * package protected constructor - Objects of Courier should only be
 	 * instantiated by the Factory
 	 * 
@@ -94,8 +108,10 @@
 		        .getErrorDelete()));
 
 	        jdbcFactory = new JDBCEprDBResourceFactory(epr);
-	}
 
+        monitoringRetryCount = (epr.getRetryCountColumn() != null);
+    }
+
 	public void cleanup() {
 	}
 
@@ -223,90 +239,129 @@
 		long limit = System.currentTimeMillis()
 				+ ((millis < 100) ? 100 : millis);
 
-		do
-		{
+        do
+        {
             boolean transactional = isTransactional();
-            Connection connection = jdbcFactory.createConnection(transactional);
+            MessagePickupProspect pickupProspect;
+
             try {
-                PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
+                pickupProspect = getPickupProspect(transactional);
+            } catch (Exception e) {
+                _logger.warn("Exception while attempting to lookup message pickup prospect.", e);
+                return null;
+            }
+
+            if(pickupProspect != null) {
+                Connection connection = jdbcFactory.createConnection(transactional);
                 try {
-                    ResultSet resultSet = listStatement.executeQuery();
-                    try {
-                        while (resultSet.next()) {
-                            String messageId = resultSet.getString(1);
+                    result = tryToPickup(pickupProspect, connection);
 
-                            result = tryToPickup(messageId, connection);
+                    // We've successfully picked up a message, so we can commit on a
+                    // non-transacted connection...
+                    if (!transactional) {
+                        connection.commit();
+                    }
 
-                            // We've successfully picked up a message, so we can commit on a
-                            // non-transacted connection...
-                            if (!transactional) {
-                                connection.commit();
-                            }
-
-                            if (result != null) {
-                                return result;
-                            }
+                    if (result != null) {
+                        if(transactional && monitoringRetryCount && pickupProspect.sendToDQL) {
+                            // Not going to deliver this message to the action pipeline
+                            // because it has already failed (been retried) a number of
+                            // time.  Sending to the DLQ...
+                            deliverToDLQ(result);
+                            return null;
+                        } else {
+                            return result;
                         }
-                    } finally {
+                    }
+                } catch (FaultMessageException e) {
+                    // The picked up message was a fault, generating this exception
+                    // in Factory.createExceptionFromFault.  Just rethrow...
+                    throw e;
+                } catch (Exception e) {
+                    _logger.warn("Exception during pickup", e);
+                    if (!transactional) {
                         try {
-                            resultSet.close();
-                        } catch (Exception e) {
-                            _logger.warn("SQL Exception closing ResultSet", e);
+                            connection.rollback();
+                        } catch (SQLException e1) {
+                            _logger.warn("SQL Exception during rollback", e);
                         }
                     }
+                    throw new CourierTransportException(e);
                 } finally {
                     try {
-                        listStatement.close();
-                    } catch (Exception e) {
-                        _logger.warn("SQL Exception closing PreparedStatement", e);
+                        connection.close();
+                    } catch (SQLException e) {
+                        _logger.warn("Error closing DataSource Connection.", e);
                     }
                 }
-            } catch (FaultMessageException e) {
-                // The picked up message was a fault, generating this exception
-                // in Factory.createExceptionFromFault.  Just rethrow...
-                throw e;
-            } catch (Exception e) {
-                _logger.warn("Exception during pickup", e);
-                if (!transactional) {
-                    try {
-                        connection.rollback();
-                    } catch (SQLException e1) {
-                        _logger.warn("SQL Exception during rollback", e);
-                    }
-                }
-                throw new CourierTransportException(e);
-            } finally {
+
                 try {
-                    connection.close();
-                } catch (SQLException e) {
-                    _logger.warn("Error closing DataSource Connection.", e);
+                    long lSleep = limit - System.currentTimeMillis();
+                    if (_pollLatency < lSleep)
+                        lSleep = _pollLatency;
+                    if (lSleep > 0)
+                        Thread.sleep(lSleep);
                 }
+                catch (InterruptedException e) {
+                    return null;
+                }
             }
+        } while (System.currentTimeMillis() <= limit);
 
+        return null;
+    }
+
+    private MessagePickupProspect getPickupProspect(boolean transactional) throws TransactionStrategyException, CourierServiceBindException, SQLException, CourierTransportException {
+        MessagePickupProspect prospect = null;
+
+        // Attempt to read a prospect's details from the DB...
+        Connection connection = jdbcFactory.createConnection(transactional);
+        try {
+            PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
             try {
-                long lSleep = limit - System.currentTimeMillis();
-                if (_pollLatency < lSleep)
-                    lSleep = _pollLatency;
-                if (lSleep > 0)
-                    Thread.sleep(lSleep);
+                ResultSet resultSet = listStatement.executeQuery();
+                try {
+                    if (resultSet.next()) {
+                        prospect = new MessagePickupProspect();
+                        prospect.messageId = resultSet.getString(1);
+                        prospect.timestamp = resultSet.getLong(2);
+                        prospect.status = resultSet.getString(3);
+
+                        if (transactional && monitoringRetryCount) {
+                            prospect.retryCount = resultSet.getInt(4);
+                        }
+                    }
+                } finally {
+                    resultSet.close();
+                }
+            } finally {
+                listStatement.close();
             }
-            catch (InterruptedException e) {
-                return null;
+        } finally {
+            connection.close();
+        }
+
+        // If we read a prospect...
+        if(prospect != null && transactional && monitoringRetryCount) {
+            if(prospect.retryCount >= retryLimit) {
+                prospect.sendToDQL = true;
             }
-        } while (System.currentTimeMillis() <= limit);
+            prospect = updateRetryCount(prospect);
+        }
 
-        return null;
+        return prospect;
     }
 
-    private Message tryToPickup(String messageId, Connection connection) throws CourierException, SQLException
+    private Message tryToPickup(MessagePickupProspect prospect, Connection connection) throws CourierException, SQLException
 	{
-        PreparedStatement selectUpdateStatement = jdbcFactory.createSelect4UpdateStatement(connection);
+        PreparedStatement selectPicukupMessageStatement = jdbcFactory.createSelectPickupMessage(connection);
 
         try {
-            selectUpdateStatement.setString(1, messageId);
-            selectUpdateStatement.setString(2, State.Pending.getColumnValue());
+            selectPicukupMessageStatement.setString(1, prospect.messageId);
+            selectPicukupMessageStatement.setString(2, prospect.status);
+            selectPicukupMessageStatement.setLong(3, prospect.timestamp);
 
-            ResultSet resultSet = selectUpdateStatement.executeQuery();
+            ResultSet resultSet = selectPicukupMessageStatement.executeQuery();
             try
             {
                 if (resultSet.next())
@@ -322,7 +377,7 @@
                         case Types.BLOB:
                             final Blob blob = resultSet.getBlob(1) ;
                             final byte[] blobData = ((blob != null) ? StreamUtils.readStream(blob.getBinaryStream()) : null);
-                            
+
                             if (blobData != null)
                                 value = new String(blobData) ;
                             else
@@ -336,7 +391,7 @@
                             break ;
                         case Types.CLOB:
                             final Clob clob = resultSet.getClob(1) ;
-                            
+
                             if (clob != null)
                                 value = StreamUtils.readReader(clob.getCharacterStream());
                             else
@@ -361,13 +416,13 @@
                         result = null;
                     } finally {
                         if (result == null && deleteOnError) {
-                            deleteMsg(messageId, connection);
+                            deleteMsg(prospect.messageId, connection);
                         } else if (result != null && deleteOnSuccess) {
-                            deleteMsg(messageId, connection);
+                            deleteMsg(prospect.messageId, connection);
                         } else if(result == null) {
-                            changeStatus(messageId, State.Error, connection);
+                            changeStatus(prospect.messageId, State.Error, connection);
                         } else {
-                            changeStatus(messageId, State.Done, connection);
+                            changeStatus(prospect.messageId, State.Done, connection);
                         }
                     }
 
@@ -384,12 +439,71 @@
                 }
             }
         } finally {
-            selectUpdateStatement.close();
+            selectPicukupMessageStatement.close();
         }
 
         return null;
 	}
 
+    private MessagePickupProspect updateRetryCount(MessagePickupProspect prospect) throws SQLException, CourierServiceBindException, CourierTransportException, TransactionStrategyException {
+        TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+        Object suspendedTX;
+
+        suspendedTX = txStrategy.suspend();
+        try {
+            txStrategy.begin();
+
+            try {
+                Connection connection = jdbcFactory.createConnection(true);
+                boolean updated = false;
+
+                try {
+                    PreparedStatement updateStatement = jdbcFactory.createUpdateRetryCountStatement(connection);
+
+                    try {
+                        long readTimestamp = prospect.timestamp;
+                        long updateTimeout = System.currentTimeMillis();
+
+                        if(prospect.sendToDQL) {
+                            // Mark status as error...
+                            prospect.status = SqlTableCourier.State.Error.getColumnValue();
+                        }
+
+                        updateStatement.setInt(1, prospect.retryCount + 1);
+                        updateStatement.setString(2, prospect.status);
+                        updateStatement.setLong(3, updateTimeout);
+                        updateStatement.setString(4, prospect.messageId);
+                        updateStatement.setLong(5, readTimestamp);
+
+                        // Update the timestamp as the pickup will need this...
+                        prospect.timestamp = updateTimeout;
+
+                        updated = (updateStatement.executeUpdate() == 1);
+                    } finally {
+                        updateStatement.close();
+                    }
+                } finally {
+                    connection.close();
+                }
+                txStrategy.terminate();
+
+                // Only return the prospec if the retry count was successfully updated
+                // and commited...
+                if(updated) {
+                    return prospect;
+                }
+            } catch (Exception e){
+                txStrategy.rollbackOnly();
+                txStrategy.terminate();
+                _logger.debug("Error updating message retry count: " + e.getMessage());
+            }
+        } finally {
+            txStrategy.resume(suspendedTX);
+        }
+
+        return null;
+    }
+
     private void deleteMsg(String messageId, Connection connection) throws SQLException
 	{
         PreparedStatement statement = jdbcFactory.createDeleteStatement(connection);
@@ -510,4 +624,33 @@
         }
         return messageType ;
     }
+
+    /**
+     * Deliver a message to the Dead Letter Channel Service.
+     *
+     * @throws org.jboss.soa.esb.listeners.message.MessageDeliverException Message delivery failure.
+     */
+    protected void deliverToDLQ(Message message) throws MessageDeliverException {
+        if (!"true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())) {
+            _logger.debug("org.jboss.soa.esb.dls.redeliver is turned off");
+        } else {
+            if (dlQueueInvoker == null) {
+                synchronized (ServiceInvoker.dlqService) {
+                    if (dlQueueInvoker == null) {
+                        dlQueueInvoker = new ServiceInvoker(ServiceInvoker.dlqService);
+                    }
+                }
+            }
+
+            dlQueueInvoker.deliverAsync(message);
+        }
+    }
+
+    private class MessagePickupProspect {
+        private String messageId;
+        private String status;
+        private long timestamp;
+        public int retryCount;
+        private boolean sendToDQL;
+    }
 }
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java	2009-10-19 11:43:29 UTC (rev 29656)
@@ -50,7 +50,9 @@
     private String listStatementSQL;
     private String select4UpdateStatementSQL;
     private String updateStatusStatementSQL;
+    private String updateRetryCountStatementSQL;
     private String deleteStatementSQL;
+    private String selectPickupMessage;
 
     public JDBCEprDBResourceFactory(JDBCEpr epr) throws CourierServiceBindException {
         this.epr = epr;
@@ -110,6 +112,13 @@
         return connection.prepareStatement(select4UpdateStatementSQL);
     }
 
+    public PreparedStatement createSelectPickupMessage(Connection connection) throws SQLException {
+        if (selectPickupMessage == null) {
+            selectPickupMessage = buildSelectPickupMessage();
+        }
+        return connection.prepareStatement(selectPickupMessage);
+    }
+
     public PreparedStatement createUpdateStatusStatement(Connection connection) throws SQLException {
         if (updateStatusStatementSQL == null) {
             updateStatusStatementSQL = buildUpdateStatusStatementSQL();
@@ -117,6 +126,13 @@
         return connection.prepareStatement(updateStatusStatementSQL);
     }
 
+    public PreparedStatement createUpdateRetryCountStatement(Connection connection) throws SQLException {
+        if (updateRetryCountStatementSQL == null) {
+            updateRetryCountStatementSQL = buildUpdateRetryCountStatementSQL();
+        }
+        return connection.prepareStatement(updateRetryCountStatementSQL);
+    }
+
     public PreparedStatement createInsertStatement(Connection connection) throws SQLException {
         if (insertStatementSQL == null) {
             insertStatementSQL = buildInsertStatementSQL();
@@ -134,29 +150,48 @@
     private String buildSelect4UpdateStatementSQL() {
         StringBuilder sb = new StringBuilder("select ");
 
+        sb = sb.append(epr.getDataColumn())
+                .append(" from ")
+                .append(epr.getTableName())
+                .append(" where ")
+                .append(epr.getMessageIdColumn()).append(" = ?")
+                .append(" and ").append(epr.getStatusColumn()).append(" = ?");
+
         if (!epr.getURL().contains("hsqldb")) {
-            sb = sb.append(
-                    epr.getDataColumn()).append(" from ").append(
-                            epr.getTableName()).append(" where ").append(
-                                    epr.getMessageIdColumn()).append("=?").append(
-                                    " and ").append(epr.getStatusColumn())
-                                    .append("=?").append(" for update");
+            sb.append(" for update");
         } else {
             /*
              * HSQL does not support FOR UPDATE! All tables appear to
              * be inherently updatable!
              */
-            sb = sb.append(
-                    epr.getDataColumn()).append(" from ").append(
-                            epr.getTableName()).append(" where ").append(
-                                    epr.getMessageIdColumn()).append("=?").append(
-                                    " and ").append(epr.getStatusColumn())
-                                    .append("=?");
         }
 
         return sb.toString();
     }
 
+    private String buildSelectPickupMessage() {
+        StringBuilder sb = new StringBuilder("select ");
+
+        sb = sb.append(epr.getDataColumn())
+                .append(" from ")
+                .append(epr.getTableName())
+                .append(" where ")
+                .append(epr.getMessageIdColumn()).append(" = ?")
+                .append(" and ").append(epr.getStatusColumn()).append(" = ?")
+                .append(" and ").append(epr.getTimestampColumn()).append(" = ?");
+
+        if (!epr.getURL().contains("hsqldb")) {
+            sb.append(" for update");
+        } else {
+            /*
+             * HSQL does not support FOR UPDATE! All tables appear to
+             * be inherently updatable!
+             */
+        }
+
+        return sb.toString();
+    }
+
     private String buildUpdateStatusStatementSQL() {
             StringBuilder sb = new StringBuilder("update ").append(
                     epr.getTableName()).append(" set ").append(
@@ -166,6 +201,20 @@
             return sb.toString();
     }
 
+    private String buildUpdateRetryCountStatementSQL() {
+            StringBuilder sb = new StringBuilder("update ")
+                    .append(epr.getTableName())
+                    .append(" set ")
+                    .append(epr.getRetryCountColumn()).append("= ?, ")
+                    .append(epr.getStatusColumn()).append("= ?, ")
+                    .append(epr.getTimestampColumn()).append("= ?")
+                    .append(" where ")
+                    .append(epr.getMessageIdColumn()).append("=? and ")
+                    .append(epr.getTimestampColumn()).append("=?");
+
+            return sb.toString();
+    }
+
     private String buildInsertStatementSQL() {
             StringBuilder sb = new StringBuilder();
 
@@ -185,7 +234,14 @@
 
         sb.append("select ");
         sb.append(epr.getMessageIdColumn()).append(", ");
-        sb.append(epr.getTimestampColumn());
+        sb.append(epr.getTimestampColumn()).append(", ");
+        sb.append(epr.getStatusColumn());
+
+        String retryColumn = epr.getRetryCountColumn();
+        if(retryColumn != null) {
+            sb.append(", " + retryColumn);
+        }
+
         sb.append(" from ").append(epr.getTableName());
         sb.append(" where ").append(epr.getStatusColumn());
         sb.append(" = '").append(SqlTableCourier.State.Pending.getColumnValue()).append("'");

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java	2009-10-19 11:43:29 UTC (rev 29656)
@@ -32,6 +32,7 @@
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.PortReference;
 import org.jboss.soa.esb.addressing.XMLUtil;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
@@ -57,6 +58,7 @@
 	public static final String ORDER_BY_TAG				= "orderBy";
 	public static final String MESSAGE_ID_COLUMN_TAG 	= "message_id_column";
 	public static final String STATUS_COLUMN_TAG 		= "status_column";
+    public static final String RETRY_COUNT_COLUMN_TAG 	= "retry_count_column";
 	public static final String DATA_COLUMN_TAG 			= "message_column";
 	public static final String TIMESTAMP_COLUMN_TAG 	= "insert_timestamp_column";
 	public static final String WHERE_CONDITION_TAG 		= "whereCondition";
@@ -148,6 +150,11 @@
 																{
 																	if (tag.equals(ERROR_DEL_TAG))
 																		getAddr().addExtension(ERROR_DEL_TAG, nl.item(i).getTextContent());
+                                                                    else
+                                                                    {
+                                                                        if (tag.equals(RETRY_COUNT_COLUMN_TAG))
+                                                                            setRetryCountColumn(nl.item(i).getTextContent());
+                                                                    }
 																}
 															}
 														}
@@ -400,7 +407,36 @@
 	{
 		return getAddr().getExtensionValue(MESSAGE_ID_COLUMN_TAG);
 	}
-	
+
+    /**
+     * Set the retry count column name that is used by this EPR.
+     *
+     * @param columnName the column name for the retry count.
+     * @throws URISyntaxException thrown if this EPR is malformed.
+     */
+
+    public final void setRetryCountColumn (String columnName) throws URISyntaxException
+    {
+        if(columnName == null) {
+            return;
+        }
+
+        if (retryCountSet)
+            throw new IllegalStateException("Retry Count column already set.");
+
+        getAddr().addExtension(RETRY_COUNT_COLUMN_TAG, columnName);
+        retryCountSet = true;
+    }
+
+    /**
+     * @return the retry count column used by this EPR.
+     */
+
+    public final String getRetryCountColumn ()
+    {
+        return getAddr().getExtensionValue(RETRY_COUNT_COLUMN_TAG);
+    }
+
 	/**
 	 * Set the status column that is used by this EPR.
 	 * 
@@ -521,6 +557,7 @@
 	private boolean driverSet = false;
 	private boolean tableNameSet = false;
 	private boolean messageIdSet = false;
+    private boolean retryCountSet = false;
 	private boolean statusSet = false;
 	private boolean dataSet = false;
 	private boolean timestampSet = false;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2009-10-19 11:43:29 UTC (rev 29656)
@@ -42,6 +42,7 @@
     public static final String DEFAULT_INVM_SCOPE     = "jboss.esb.invm.scope.default";
     public static final String INVM_EXPIRY_TIME = "org.jboss.soa.esb.invm.expiryTime";
     public static final String INVM_RETRY_LIMIT = "org.jboss.soa.esb.invm.retry.limit";
+    public static final String SQL_RETRY_LIMIT = "org.jboss.soa.esb.sql.retry.limit";
 
     public static final String SMTP_HOST     = "org.jboss.soa.esb.mail.smtp.host";
         public static final String SMTP_USERNAME = "org.jboss.soa.esb.mail.smtp.user";

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2009-10-19 11:43:29 UTC (rev 29656)
@@ -360,6 +360,7 @@
 			
 			epr.setTableName(tree.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG));
 			epr.setMessageIdColumn(getColName(tree, JDBCEpr.MESSAGE_ID_COLUMN_TAG));
+            epr.setRetryCountColumn(tree.getAttribute(JDBCEpr.RETRY_COUNT_COLUMN_TAG));
 			epr.setStatusColumn(getColName(tree, JDBCEpr.STATUS_COLUMN_TAG));
 			epr.setDataColumn(getColName(tree, JDBCEpr.DATA_COLUMN_TAG));
 			epr.setTimestampColumn(getColName(tree, JDBCEpr.TIMESTAMP_COLUMN_TAG));

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java	2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java	2009-10-19 11:43:29 UTC (rev 29656)
@@ -135,6 +135,7 @@
 		toElement.setAttribute(JDBCEpr.WHERE_CONDITION_TAG, messageFilter.getWhereCondition());
 		toElement.setAttribute(JDBCEpr.ORDER_BY_TAG, messageFilter.getOrderBy());
 		toElement.setAttribute(JDBCEpr.MESSAGE_ID_COLUMN_TAG, messageFilter.getMessageIdColumn());
+        toElement.setAttribute(JDBCEpr.RETRY_COUNT_COLUMN_TAG, messageFilter.getRetryCountColumn());
 		toElement.setAttribute(JDBCEpr.DATA_COLUMN_TAG, messageFilter.getMessageColumn());
 		toElement.setAttribute(JDBCEpr.STATUS_COLUMN_TAG, String.valueOf(messageFilter.getStatusColumn()));
 		toElement.setAttribute(JDBCEpr.TIMESTAMP_COLUMN_TAG, messageFilter.getInsertTimestampColumn());



More information about the jboss-svn-commits mailing list