[jboss-svn-commits] JBL Code SVN: r29656 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/couriers and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Oct 19 07:43:30 EDT 2009
Author: tfennelly
Date: 2009-10-19 07:43:29 -0400 (Mon, 19 Oct 2009)
New Revision: 29656
Modified:
labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java
Log:
https://jira.jboss.org/jira/browse/JBESB-1810
There is no retry limit when using transacted SQL listener
Modified: labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd
===================================================================
--- labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.2.0.xsd 2009-10-19 11:43:29 UTC (rev 29656)
@@ -1447,6 +1447,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="retry-count-column" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation xml:lang="en">
+ Column name for storing the message delivery retry count.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="message-column"
type="xsd:string" use="required">
<xsd:annotation>
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2009-10-19 11:43:29 UTC (rev 29656)
@@ -27,46 +27,60 @@
import org.jboss.internal.soa.esb.util.StreamUtils;
import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.couriers.CourierTransportException;
-import org.jboss.soa.esb.couriers.FaultMessageException;
+import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.couriers.*;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.util.Util;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Types;
+import java.sql.*;
import java.util.UUID;
public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
{
+ public static final String SQL_RETRY_COUNT = "org.jboss.soa.esb.sql.retry.count";
+
protected long _pollLatency = 200;
protected long _sleepForRetries = 3000; // milliseconds
protected boolean deleteOnSuccess, deleteOnError;
- protected boolean _isReceiver;
-
- private int messageType = Types.OTHER ;
+ protected boolean _isReceiver;
+ private boolean monitoringRetryCount;
+
+ private int messageType = Types.OTHER ;
+
private JDBCEprDBResourceFactory jdbcFactory;
+ protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
- protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+ /**
+ * Dead letter channel ServiceInvoker. Messages are delivered to the DLQ after the retry limit for a
+ * failed message has been exceeded.
+ */
+ private static ServiceInvoker dlQueueInvoker;
- /**
+ /**
+ * Redelivery retry limit.
+ */
+ private static int retryLimit;
+
+ static {
+ String retryLimitConfig = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).getProperty(Environment.SQL_RETRY_LIMIT, "5").trim();
+ try {
+ retryLimit = Integer.parseInt(retryLimitConfig);
+ } catch (NumberFormatException e) {
+ retryLimit = 5;
+ }
+ }
+
+ /**
* package protected constructor - Objects of Courier should only be
* instantiated by the Factory
*
@@ -94,8 +108,10 @@
.getErrorDelete()));
jdbcFactory = new JDBCEprDBResourceFactory(epr);
- }
+ monitoringRetryCount = (epr.getRetryCountColumn() != null);
+ }
+
public void cleanup() {
}
@@ -223,90 +239,129 @@
long limit = System.currentTimeMillis()
+ ((millis < 100) ? 100 : millis);
- do
- {
+ do
+ {
boolean transactional = isTransactional();
- Connection connection = jdbcFactory.createConnection(transactional);
+ MessagePickupProspect pickupProspect;
+
try {
- PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
+ pickupProspect = getPickupProspect(transactional);
+ } catch (Exception e) {
+ _logger.warn("Exception while attempting to lookup message pickup prospect.", e);
+ return null;
+ }
+
+ if(pickupProspect != null) {
+ Connection connection = jdbcFactory.createConnection(transactional);
try {
- ResultSet resultSet = listStatement.executeQuery();
- try {
- while (resultSet.next()) {
- String messageId = resultSet.getString(1);
+ result = tryToPickup(pickupProspect, connection);
- result = tryToPickup(messageId, connection);
+ // We've successfully picked up a message, so we can commit on a
+ // non-transacted connection...
+ if (!transactional) {
+ connection.commit();
+ }
- // We've successfully picked up a message, so we can commit on a
- // non-transacted connection...
- if (!transactional) {
- connection.commit();
- }
-
- if (result != null) {
- return result;
- }
+ if (result != null) {
+ if(transactional && monitoringRetryCount && pickupProspect.sendToDQL) {
+ // Not going to deliver this message to the action pipeline
+ // because it has already failed (been retried) a number of
+ // time. Sending to the DLQ...
+ deliverToDLQ(result);
+ return null;
+ } else {
+ return result;
}
- } finally {
+ }
+ } catch (FaultMessageException e) {
+ // The picked up message was a fault, generating this exception
+ // in Factory.createExceptionFromFault. Just rethrow...
+ throw e;
+ } catch (Exception e) {
+ _logger.warn("Exception during pickup", e);
+ if (!transactional) {
try {
- resultSet.close();
- } catch (Exception e) {
- _logger.warn("SQL Exception closing ResultSet", e);
+ connection.rollback();
+ } catch (SQLException e1) {
+ _logger.warn("SQL Exception during rollback", e);
}
}
+ throw new CourierTransportException(e);
} finally {
try {
- listStatement.close();
- } catch (Exception e) {
- _logger.warn("SQL Exception closing PreparedStatement", e);
+ connection.close();
+ } catch (SQLException e) {
+ _logger.warn("Error closing DataSource Connection.", e);
}
}
- } catch (FaultMessageException e) {
- // The picked up message was a fault, generating this exception
- // in Factory.createExceptionFromFault. Just rethrow...
- throw e;
- } catch (Exception e) {
- _logger.warn("Exception during pickup", e);
- if (!transactional) {
- try {
- connection.rollback();
- } catch (SQLException e1) {
- _logger.warn("SQL Exception during rollback", e);
- }
- }
- throw new CourierTransportException(e);
- } finally {
+
try {
- connection.close();
- } catch (SQLException e) {
- _logger.warn("Error closing DataSource Connection.", e);
+ long lSleep = limit - System.currentTimeMillis();
+ if (_pollLatency < lSleep)
+ lSleep = _pollLatency;
+ if (lSleep > 0)
+ Thread.sleep(lSleep);
}
+ catch (InterruptedException e) {
+ return null;
+ }
}
+ } while (System.currentTimeMillis() <= limit);
+ return null;
+ }
+
+ private MessagePickupProspect getPickupProspect(boolean transactional) throws TransactionStrategyException, CourierServiceBindException, SQLException, CourierTransportException {
+ MessagePickupProspect prospect = null;
+
+ // Attempt to read a prospect's details from the DB...
+ Connection connection = jdbcFactory.createConnection(transactional);
+ try {
+ PreparedStatement listStatement = jdbcFactory.createListStatement(connection);
try {
- long lSleep = limit - System.currentTimeMillis();
- if (_pollLatency < lSleep)
- lSleep = _pollLatency;
- if (lSleep > 0)
- Thread.sleep(lSleep);
+ ResultSet resultSet = listStatement.executeQuery();
+ try {
+ if (resultSet.next()) {
+ prospect = new MessagePickupProspect();
+ prospect.messageId = resultSet.getString(1);
+ prospect.timestamp = resultSet.getLong(2);
+ prospect.status = resultSet.getString(3);
+
+ if (transactional && monitoringRetryCount) {
+ prospect.retryCount = resultSet.getInt(4);
+ }
+ }
+ } finally {
+ resultSet.close();
+ }
+ } finally {
+ listStatement.close();
}
- catch (InterruptedException e) {
- return null;
+ } finally {
+ connection.close();
+ }
+
+ // If we read a prospect...
+ if(prospect != null && transactional && monitoringRetryCount) {
+ if(prospect.retryCount >= retryLimit) {
+ prospect.sendToDQL = true;
}
- } while (System.currentTimeMillis() <= limit);
+ prospect = updateRetryCount(prospect);
+ }
- return null;
+ return prospect;
}
- private Message tryToPickup(String messageId, Connection connection) throws CourierException, SQLException
+ private Message tryToPickup(MessagePickupProspect prospect, Connection connection) throws CourierException, SQLException
{
- PreparedStatement selectUpdateStatement = jdbcFactory.createSelect4UpdateStatement(connection);
+ PreparedStatement selectPicukupMessageStatement = jdbcFactory.createSelectPickupMessage(connection);
try {
- selectUpdateStatement.setString(1, messageId);
- selectUpdateStatement.setString(2, State.Pending.getColumnValue());
+ selectPicukupMessageStatement.setString(1, prospect.messageId);
+ selectPicukupMessageStatement.setString(2, prospect.status);
+ selectPicukupMessageStatement.setLong(3, prospect.timestamp);
- ResultSet resultSet = selectUpdateStatement.executeQuery();
+ ResultSet resultSet = selectPicukupMessageStatement.executeQuery();
try
{
if (resultSet.next())
@@ -322,7 +377,7 @@
case Types.BLOB:
final Blob blob = resultSet.getBlob(1) ;
final byte[] blobData = ((blob != null) ? StreamUtils.readStream(blob.getBinaryStream()) : null);
-
+
if (blobData != null)
value = new String(blobData) ;
else
@@ -336,7 +391,7 @@
break ;
case Types.CLOB:
final Clob clob = resultSet.getClob(1) ;
-
+
if (clob != null)
value = StreamUtils.readReader(clob.getCharacterStream());
else
@@ -361,13 +416,13 @@
result = null;
} finally {
if (result == null && deleteOnError) {
- deleteMsg(messageId, connection);
+ deleteMsg(prospect.messageId, connection);
} else if (result != null && deleteOnSuccess) {
- deleteMsg(messageId, connection);
+ deleteMsg(prospect.messageId, connection);
} else if(result == null) {
- changeStatus(messageId, State.Error, connection);
+ changeStatus(prospect.messageId, State.Error, connection);
} else {
- changeStatus(messageId, State.Done, connection);
+ changeStatus(prospect.messageId, State.Done, connection);
}
}
@@ -384,12 +439,71 @@
}
}
} finally {
- selectUpdateStatement.close();
+ selectPicukupMessageStatement.close();
}
return null;
}
+ private MessagePickupProspect updateRetryCount(MessagePickupProspect prospect) throws SQLException, CourierServiceBindException, CourierTransportException, TransactionStrategyException {
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+ Object suspendedTX;
+
+ suspendedTX = txStrategy.suspend();
+ try {
+ txStrategy.begin();
+
+ try {
+ Connection connection = jdbcFactory.createConnection(true);
+ boolean updated = false;
+
+ try {
+ PreparedStatement updateStatement = jdbcFactory.createUpdateRetryCountStatement(connection);
+
+ try {
+ long readTimestamp = prospect.timestamp;
+ long updateTimeout = System.currentTimeMillis();
+
+ if(prospect.sendToDQL) {
+ // Mark status as error...
+ prospect.status = SqlTableCourier.State.Error.getColumnValue();
+ }
+
+ updateStatement.setInt(1, prospect.retryCount + 1);
+ updateStatement.setString(2, prospect.status);
+ updateStatement.setLong(3, updateTimeout);
+ updateStatement.setString(4, prospect.messageId);
+ updateStatement.setLong(5, readTimestamp);
+
+ // Update the timestamp as the pickup will need this...
+ prospect.timestamp = updateTimeout;
+
+ updated = (updateStatement.executeUpdate() == 1);
+ } finally {
+ updateStatement.close();
+ }
+ } finally {
+ connection.close();
+ }
+ txStrategy.terminate();
+
+ // Only return the prospec if the retry count was successfully updated
+ // and commited...
+ if(updated) {
+ return prospect;
+ }
+ } catch (Exception e){
+ txStrategy.rollbackOnly();
+ txStrategy.terminate();
+ _logger.debug("Error updating message retry count: " + e.getMessage());
+ }
+ } finally {
+ txStrategy.resume(suspendedTX);
+ }
+
+ return null;
+ }
+
private void deleteMsg(String messageId, Connection connection) throws SQLException
{
PreparedStatement statement = jdbcFactory.createDeleteStatement(connection);
@@ -510,4 +624,33 @@
}
return messageType ;
}
+
+ /**
+ * Deliver a message to the Dead Letter Channel Service.
+ *
+ * @throws org.jboss.soa.esb.listeners.message.MessageDeliverException Message delivery failure.
+ */
+ protected void deliverToDLQ(Message message) throws MessageDeliverException {
+ if (!"true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())) {
+ _logger.debug("org.jboss.soa.esb.dls.redeliver is turned off");
+ } else {
+ if (dlQueueInvoker == null) {
+ synchronized (ServiceInvoker.dlqService) {
+ if (dlQueueInvoker == null) {
+ dlQueueInvoker = new ServiceInvoker(ServiceInvoker.dlqService);
+ }
+ }
+ }
+
+ dlQueueInvoker.deliverAsync(message);
+ }
+ }
+
+ private class MessagePickupProspect {
+ private String messageId;
+ private String status;
+ private long timestamp;
+ public int retryCount;
+ private boolean sendToDQL;
+ }
}
\ No newline at end of file
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/helpers/JDBCEprDBResourceFactory.java 2009-10-19 11:43:29 UTC (rev 29656)
@@ -50,7 +50,9 @@
private String listStatementSQL;
private String select4UpdateStatementSQL;
private String updateStatusStatementSQL;
+ private String updateRetryCountStatementSQL;
private String deleteStatementSQL;
+ private String selectPickupMessage;
public JDBCEprDBResourceFactory(JDBCEpr epr) throws CourierServiceBindException {
this.epr = epr;
@@ -110,6 +112,13 @@
return connection.prepareStatement(select4UpdateStatementSQL);
}
+ public PreparedStatement createSelectPickupMessage(Connection connection) throws SQLException {
+ if (selectPickupMessage == null) {
+ selectPickupMessage = buildSelectPickupMessage();
+ }
+ return connection.prepareStatement(selectPickupMessage);
+ }
+
public PreparedStatement createUpdateStatusStatement(Connection connection) throws SQLException {
if (updateStatusStatementSQL == null) {
updateStatusStatementSQL = buildUpdateStatusStatementSQL();
@@ -117,6 +126,13 @@
return connection.prepareStatement(updateStatusStatementSQL);
}
+ public PreparedStatement createUpdateRetryCountStatement(Connection connection) throws SQLException {
+ if (updateRetryCountStatementSQL == null) {
+ updateRetryCountStatementSQL = buildUpdateRetryCountStatementSQL();
+ }
+ return connection.prepareStatement(updateRetryCountStatementSQL);
+ }
+
public PreparedStatement createInsertStatement(Connection connection) throws SQLException {
if (insertStatementSQL == null) {
insertStatementSQL = buildInsertStatementSQL();
@@ -134,29 +150,48 @@
private String buildSelect4UpdateStatementSQL() {
StringBuilder sb = new StringBuilder("select ");
+ sb = sb.append(epr.getDataColumn())
+ .append(" from ")
+ .append(epr.getTableName())
+ .append(" where ")
+ .append(epr.getMessageIdColumn()).append(" = ?")
+ .append(" and ").append(epr.getStatusColumn()).append(" = ?");
+
if (!epr.getURL().contains("hsqldb")) {
- sb = sb.append(
- epr.getDataColumn()).append(" from ").append(
- epr.getTableName()).append(" where ").append(
- epr.getMessageIdColumn()).append("=?").append(
- " and ").append(epr.getStatusColumn())
- .append("=?").append(" for update");
+ sb.append(" for update");
} else {
/*
* HSQL does not support FOR UPDATE! All tables appear to
* be inherently updatable!
*/
- sb = sb.append(
- epr.getDataColumn()).append(" from ").append(
- epr.getTableName()).append(" where ").append(
- epr.getMessageIdColumn()).append("=?").append(
- " and ").append(epr.getStatusColumn())
- .append("=?");
}
return sb.toString();
}
+ private String buildSelectPickupMessage() {
+ StringBuilder sb = new StringBuilder("select ");
+
+ sb = sb.append(epr.getDataColumn())
+ .append(" from ")
+ .append(epr.getTableName())
+ .append(" where ")
+ .append(epr.getMessageIdColumn()).append(" = ?")
+ .append(" and ").append(epr.getStatusColumn()).append(" = ?")
+ .append(" and ").append(epr.getTimestampColumn()).append(" = ?");
+
+ if (!epr.getURL().contains("hsqldb")) {
+ sb.append(" for update");
+ } else {
+ /*
+ * HSQL does not support FOR UPDATE! All tables appear to
+ * be inherently updatable!
+ */
+ }
+
+ return sb.toString();
+ }
+
private String buildUpdateStatusStatementSQL() {
StringBuilder sb = new StringBuilder("update ").append(
epr.getTableName()).append(" set ").append(
@@ -166,6 +201,20 @@
return sb.toString();
}
+ private String buildUpdateRetryCountStatementSQL() {
+ StringBuilder sb = new StringBuilder("update ")
+ .append(epr.getTableName())
+ .append(" set ")
+ .append(epr.getRetryCountColumn()).append("= ?, ")
+ .append(epr.getStatusColumn()).append("= ?, ")
+ .append(epr.getTimestampColumn()).append("= ?")
+ .append(" where ")
+ .append(epr.getMessageIdColumn()).append("=? and ")
+ .append(epr.getTimestampColumn()).append("=?");
+
+ return sb.toString();
+ }
+
private String buildInsertStatementSQL() {
StringBuilder sb = new StringBuilder();
@@ -185,7 +234,14 @@
sb.append("select ");
sb.append(epr.getMessageIdColumn()).append(", ");
- sb.append(epr.getTimestampColumn());
+ sb.append(epr.getTimestampColumn()).append(", ");
+ sb.append(epr.getStatusColumn());
+
+ String retryColumn = epr.getRetryCountColumn();
+ if(retryColumn != null) {
+ sb.append(", " + retryColumn);
+ }
+
sb.append(" from ").append(epr.getTableName());
sb.append(" where ").append(epr.getStatusColumn());
sb.append(" = '").append(SqlTableCourier.State.Pending.getColumnValue()).append("'");
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java 2009-10-19 11:43:29 UTC (rev 29656)
@@ -32,6 +32,7 @@
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.PortReference;
import org.jboss.soa.esb.addressing.XMLUtil;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
@@ -57,6 +58,7 @@
public static final String ORDER_BY_TAG = "orderBy";
public static final String MESSAGE_ID_COLUMN_TAG = "message_id_column";
public static final String STATUS_COLUMN_TAG = "status_column";
+ public static final String RETRY_COUNT_COLUMN_TAG = "retry_count_column";
public static final String DATA_COLUMN_TAG = "message_column";
public static final String TIMESTAMP_COLUMN_TAG = "insert_timestamp_column";
public static final String WHERE_CONDITION_TAG = "whereCondition";
@@ -148,6 +150,11 @@
{
if (tag.equals(ERROR_DEL_TAG))
getAddr().addExtension(ERROR_DEL_TAG, nl.item(i).getTextContent());
+ else
+ {
+ if (tag.equals(RETRY_COUNT_COLUMN_TAG))
+ setRetryCountColumn(nl.item(i).getTextContent());
+ }
}
}
}
@@ -400,7 +407,36 @@
{
return getAddr().getExtensionValue(MESSAGE_ID_COLUMN_TAG);
}
-
+
+ /**
+ * Set the retry count column name that is used by this EPR.
+ *
+ * @param columnName the column name for the retry count.
+ * @throws URISyntaxException thrown if this EPR is malformed.
+ */
+
+ public final void setRetryCountColumn (String columnName) throws URISyntaxException
+ {
+ if(columnName == null) {
+ return;
+ }
+
+ if (retryCountSet)
+ throw new IllegalStateException("Retry Count column already set.");
+
+ getAddr().addExtension(RETRY_COUNT_COLUMN_TAG, columnName);
+ retryCountSet = true;
+ }
+
+ /**
+ * @return the retry count column used by this EPR.
+ */
+
+ public final String getRetryCountColumn ()
+ {
+ return getAddr().getExtensionValue(RETRY_COUNT_COLUMN_TAG);
+ }
+
/**
* Set the status column that is used by this EPR.
*
@@ -521,6 +557,7 @@
private boolean driverSet = false;
private boolean tableNameSet = false;
private boolean messageIdSet = false;
+ private boolean retryCountSet = false;
private boolean statusSet = false;
private boolean dataSet = false;
private boolean timestampSet = false;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2009-10-19 11:43:29 UTC (rev 29656)
@@ -42,6 +42,7 @@
public static final String DEFAULT_INVM_SCOPE = "jboss.esb.invm.scope.default";
public static final String INVM_EXPIRY_TIME = "org.jboss.soa.esb.invm.expiryTime";
public static final String INVM_RETRY_LIMIT = "org.jboss.soa.esb.invm.retry.limit";
+ public static final String SQL_RETRY_LIMIT = "org.jboss.soa.esb.sql.retry.limit";
public static final String SMTP_HOST = "org.jboss.soa.esb.mail.smtp.host";
public static final String SMTP_USERNAME = "org.jboss.soa.esb.mail.smtp.user";
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2009-10-19 11:43:29 UTC (rev 29656)
@@ -360,6 +360,7 @@
epr.setTableName(tree.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG));
epr.setMessageIdColumn(getColName(tree, JDBCEpr.MESSAGE_ID_COLUMN_TAG));
+ epr.setRetryCountColumn(tree.getAttribute(JDBCEpr.RETRY_COUNT_COLUMN_TAG));
epr.setStatusColumn(getColName(tree, JDBCEpr.STATUS_COLUMN_TAG));
epr.setDataColumn(getColName(tree, JDBCEpr.DATA_COLUMN_TAG));
epr.setTimestampColumn(getColName(tree, JDBCEpr.TIMESTAMP_COLUMN_TAG));
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java 2009-10-19 11:34:45 UTC (rev 29655)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/SqlListenerMapper.java 2009-10-19 11:43:29 UTC (rev 29656)
@@ -135,6 +135,7 @@
toElement.setAttribute(JDBCEpr.WHERE_CONDITION_TAG, messageFilter.getWhereCondition());
toElement.setAttribute(JDBCEpr.ORDER_BY_TAG, messageFilter.getOrderBy());
toElement.setAttribute(JDBCEpr.MESSAGE_ID_COLUMN_TAG, messageFilter.getMessageIdColumn());
+ toElement.setAttribute(JDBCEpr.RETRY_COUNT_COLUMN_TAG, messageFilter.getRetryCountColumn());
toElement.setAttribute(JDBCEpr.DATA_COLUMN_TAG, messageFilter.getMessageColumn());
toElement.setAttribute(JDBCEpr.STATUS_COLUMN_TAG, String.valueOf(messageFilter.getStatusColumn()));
toElement.setAttribute(JDBCEpr.TIMESTAMP_COLUMN_TAG, messageFilter.getInsertTimestampColumn());
More information about the jboss-svn-commits
mailing list