[jboss-svn-commits] JBL Code SVN: r17426 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product: rosetta/src/org/jboss/soa/esb/common and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Dec 28 20:37:43 EST 2007
Author: mark.little at jboss.com
Date: 2007-12-28 20:37:43 -0500 (Fri, 28 Dec 2007)
New Revision: 17426
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1283
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -22,10 +22,24 @@
package org.jboss.internal.soa.esb.couriers;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.sql.DataSource;
+
import org.apache.log4j.Logger;
import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
@@ -36,333 +50,430 @@
import org.jboss.soa.esb.util.Util;
import org.xml.sax.SAXParseException;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.UUID;
+public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
+{
+ /**
+ * disable default constructor
+ */
+ private SqlTableCourier()
+ {
+ }
-public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier {
- /**
- * disable default constructor
- */
- private SqlTableCourier() {
- }
+ /**
+ * package protected constructor - Objects of Courier should only be
+ * instantiated by the Factory
+ *
+ * @param epr
+ */
+ SqlTableCourier(JDBCEpr epr) throws CourierException
+ {
+ this(epr, false);
+ }
- /**
- * package protected constructor - Objects of Courier should only be
- * instantiated by the Factory
- *
- * @param epr
- */
- SqlTableCourier(JDBCEpr epr) throws CourierException {
- this(epr, false);
- }
+ /**
+ * package protected constructor - Objects of Courier should only be
+ * instantiated by the Factory
+ *
+ * @param epr
+ */
+ SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
+ {
+ _isReceiver = isReceiver;
+ _epr = epr;
+ _sleepForRetries = 3000; // TODO magic number - configurable?
+ try
+ {
+ _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+ .getPostDelete()));
+ _errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
+ .getErrorDelete()));
+ }
+ catch (URISyntaxException e)
+ {
+ throw new CourierException(e);
+ }
- /**
- * package protected constructor - Objects of Courier should only be
- * instantiated by the Factory
- *
- * @param epr
- */
- SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException {
- _isReceiver = isReceiver;
- _epr = epr;
- _sleepForRetries = 3000; // TODO magic number - configurable?
- try {
- _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
- .getPostDelete()));
- _errorDelete = Boolean.TRUE.equals(Boolean.valueOf(epr
- .getErrorDelete()));
- }
- catch (URISyntaxException e) {
- throw new CourierException(e);
- }
+ } // ________________________________
- } // ________________________________
+ public void cleanup()
+ {
+ if (null != _conn)
+ {
+ try
+ {
+ _conn.release();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.info("Unable to release connection", e);
+ }
+ }
- public void cleanup() {
- if (null != _conn) {
- try {
- _conn.release();
- }
- catch (Exception e) {
- _logger.info("Unable to release connection");
- _logger.debug("Unable to release connection", e);
- }
- }
+ } // ________________________________
- } // ________________________________
+ /**
+ * package the ESB message in a java.io.Serializable, and write it.
+ * Delivery occurs within its own transaction.
+ *
+ * @param message
+ * Message - the message to deliverAsync
+ * @return boolean - the result of the delivery
+ * @throws CourierException -
+ * if problems were encountered
+ */
+
+ public boolean deliver(Message message) throws CourierException
+ {
+ if (_isReceiver)
+ throw new CourierException("This is a read-only Courier");
- /**
- * package the ESB message in a java.io.Serializable, and write it
- *
- * @param message Message - the message to deliverAsync
- * @return boolean - the result of the delivery
- * @throws CourierException -
- * if problems were encountered
- */
- public boolean deliver(Message message) throws CourierException {
- if (_isReceiver)
- throw new CourierException("This is a read-only Courier");
+ if (null == message)
+ return false;
- if (null == message)
- return false;
+ String msgId = null;
+ Call call = message.getHeader().getCall();
+ if (null==call)
+ message.getHeader().setCall(call=new Call());
+ try
+ {
+ if (null==call.getMessageID())
+ call.setMessageID(new URI(UUID.randomUUID().toString()));
+ msgId = call.getMessageID().toString();
+ }
+ catch (URISyntaxException e)
+ {
+ throw new CourierException("Problems with message header ",e);
+ }
- String msgId = null;
- Call call = message.getHeader().getCall();
- if (null == call)
- message.getHeader().setCall(call = new Call());
- try {
- if (null == call.getMessageID())
- call.setMessageID(new URI(UUID.randomUUID().toString()));
- msgId = call.getMessageID().toString();
- }
- catch (URISyntaxException e) {
- throw new CourierException("Problems with message header ", e);
- }
+ if (null == _conn)
+ {
+ try
+ {
+ _conn = getConn();
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ }
- if (null == _conn) {
- try {
- _conn = getConn();
- }
- catch (Exception e) {
- throw new CourierException(e);
- }
- }
+ while (_conn != null)
+ {
+ try
+ {
+ int iCol = 1;
+ PreparedStatement PS = insertStatement();
+ PS.setString(iCol++, msgId);
+ PS.setObject(iCol++, Util.serialize(message));
+ PS.setString(iCol++, State.Pending.getColumnValue());
+ PS.setLong(iCol++, System.currentTimeMillis());
- while (_conn != null) {
- try {
- int iCol = 1;
- PreparedStatement PS = insertStatement();
- PS.setString(iCol++, msgId);
- PS.setObject(iCol++, Util.serialize(message));
- PS.setString(iCol++, State.Pending.getColumnValue());
- PS.setLong(iCol++, System.currentTimeMillis());
+ _conn.execUpdWait(PS, 3);
+ _conn.commit();
+ return true;
+ }
+ catch (SQLException e)
+ {
+ if (null != _conn)
+ {
+ try
+ {
+ _conn.rollback();
+ }
+ catch (Exception roll)
+ {
+ _logger.debug(roll);
+ }
+ }
+
+ _logger.debug("SQL exception during deliver", e);
+ throw new CourierException(e);
+ }
+ catch (Exception e)
+ {
+ jdbcConnectRetry(e);
+ }
+ }
+ return false;
+ } // ________________________________
- _conn.execUpdWait(PS, 3);
- _conn.commit();
- return true;
- }
- catch (SQLException e) {
- if (null != _conn) {
- try {
- _conn.rollback();
- }
- catch (Exception roll) {
- _logger.debug(roll);
- }
- }
-
- _logger.debug("SQL exception during deliver", e);
- throw new CourierException(e);
- }
- catch (Exception e) {
- jdbcConnectRetry(e);
- }
- }
- return false;
- } // ________________________________
-
- public Message pickup(long millis) throws CourierException, CourierTimeoutException {
- Message result = null;
- long limit = System.currentTimeMillis()
- + ((millis < 100) ? 100 : millis);
- do {
- try {
+ public Message pickup(long millis) throws CourierException, CourierTimeoutException
+ {
+ Message result = null;
+ long limit = System.currentTimeMillis()
+ + ((millis < 100) ? 100 : millis);
+
+ do
+ {
+ try
+ {
+ TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+ boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+
+ transactional = (txHandle != null);
+
+ /*
+ * Make sure the current transaction is still active! If we
+ * have previously slept, then the timeout may be longer than that
+ * associated with the transaction.
+ */
+
+ /*
+ * MessageAwareListener will catch exceptions and roll back the transaction.
+ */
+
+ if (transactional && !isActive)
+ {
+ throw new CourierException("Associated transaction is no longer active!");
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ _logger.error("Could not determine transaction association!", ex);
+
+ throw new CourierException("Could not determine transaction association!");
+ }
+
+ try
+ {
ResultSet RS = getRowList();
- while (null != RS && RS.next()) {
- String messageId = RS.getString(1);
- if (null == (result = tryToPickup(messageId)))
- continue;
- /*
- * If this is fault message, then throw an exception with the contents. With the
- * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
- */
+ while (null != RS && RS.next())
+ {
+ String messageId = RS.getString(1);
- if (Type.isFaultMessage(result))
- Factory.createExceptionFromFault(result);
-
- return result;
- }
+ if (null == (result = tryToPickup(messageId)))
+ continue;
+
+ /*
+ * If this is fault message, then throw an exception with the contents. With the
+ * exception of user-defined exceptions, faults will have nothing in the body, properties etc.
+ */
+
+ if (Type.isFaultMessage(result))
+ Factory.createExceptionFromFault(result);
+
+ return result;
+ }
}
+ catch (SQLException e)
+ {
+ _logger.debug("SQL Exception during pickup", e);
+ return null;
+ }
+ finally
+ {
+ // Added to make sure we release transactions from all paths
+ if (_conn != null)
+ {
+ try
+ {
+ if (!transactional)
+ _conn.rollback() ;
+ }
+ catch (final SQLException sqle) {} //ignore
+ }
+ }
+ try
+ {
+ long lSleep = limit - System.currentTimeMillis();
+ if (_pollLatency < lSleep)
+ lSleep = _pollLatency;
+ if (lSleep > 0)
+ Thread.sleep(lSleep);
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ } while (System.currentTimeMillis() <= limit);
+ return null;
+ } // ________________________________
- catch (SQLException e) {
- _logger.debug("SQL Exception during pickup", e);
- return null;
- }
- finally {
- // Added to make sure we release transactions from all paths
- if (_conn != null) {
- try {
- _conn.rollback();
- }
- catch (final SQLException sqle) {
- } //ignore
- }
- }
- try {
- long lSleep = limit - System.currentTimeMillis();
- if (_pollLatency < lSleep)
- lSleep = _pollLatency;
- if (lSleep > 0)
- Thread.sleep(lSleep);
- }
- catch (InterruptedException e) {
- return null;
- }
- } while (System.currentTimeMillis() <= limit);
- return null;
- } // ________________________________
+ private Message tryToPickup(String messageId) throws CourierException,
+ SQLException
+ {
+ int iParm = 1;
- private Message tryToPickup(String messageId) throws CourierException,
- SQLException {
- int iParm = 1;
+ select4UpdateStatement().setString(iParm++, messageId);
+ select4UpdateStatement().setString(iParm++,
+ State.Pending.getColumnValue());
- select4UpdateStatement().setString(iParm++, messageId);
- select4UpdateStatement().setString(iParm++,
- State.Pending.getColumnValue());
+ while (_conn != null)
+ {
+ try
+ {
+ ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
+ while (RS.next())
+ {
+ Exception eBad = null;
+ try
+ {
+ Message result = Util.deserialize((Serializable) RS
+ .getObject(1));
+ if (_postDelete)
+ deleteMsg(messageId);
+ else
+ changeStatus(messageId, State.Done);
+ return result;
+ }
+ catch (ClassCastException e)
+ {
+ eBad = e;
+ }
+ catch (SAXParseException e)
+ {
+ eBad = e;
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ if (null != eBad)
+ {
+ if (_errorDelete)
+ deleteMsg(messageId);
+ else
+ changeStatus(messageId, State.Error);
+ continue;
+ }
+ }
+ return null;
+ }
+ catch (SQLException e)
+ {
+ throw new CourierException(e);
+ }
+ catch (Exception e)
+ {
+ jdbcConnectRetry(e);
+ }
+ }
+ return null;
+ } // ________________________________
- while (_conn != null) {
- try {
- ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
- while (RS.next()) {
- Exception eBad = null;
- try {
- Message result = Util.deserialize((Serializable) RS
- .getObject(1));
- if (_postDelete)
- deleteMsg(messageId);
- else
- changeStatus(messageId, State.Done);
- return result;
- }
- catch (ClassCastException e) {
- eBad = e;
- }
- catch (SAXParseException e) {
- eBad = e;
- }
- catch (Exception e) {
- throw new CourierException(e);
- }
- if (null != eBad) {
- if (_errorDelete)
- deleteMsg(messageId);
- else
- changeStatus(messageId, State.Error);
- continue;
- }
- }
- return null;
- }
- catch (SQLException e) {
- throw new CourierException(e);
- }
- catch (Exception e) {
- jdbcConnectRetry(e);
- }
- }
- return null;
- } // ________________________________
+ private void deleteMsg(String messageId) throws SQLException
+ {
+ int iParm = 1;
+ deleteStatement().setString(iParm++, messageId);
+ _conn.execUpdWait(deleteStatement(), 3);
+
+ if (!transactional)
+ _conn.commit();
+ }
- private void deleteMsg(String messageId) throws SQLException {
- int iParm = 1;
- deleteStatement().setString(iParm++, messageId);
- _conn.execUpdWait(deleteStatement(), 3);
- _conn.commit();
- }
+ private void changeStatus(String messageId, State to) throws SQLException
+ {
+ int iParm = 1;
+ updateStatusStatement().setString(iParm++, to.getColumnValue());
+ updateStatusStatement().setString(iParm++, messageId);
+ _conn.execUpdWait(updateStatusStatement(), 3);
+
+ if (!transactional)
+ _conn.commit();
+ }
- private void changeStatus(String messageId, State to) throws SQLException {
- int iParm = 1;
- updateStatusStatement().setString(iParm++, to.getColumnValue());
- updateStatusStatement().setString(iParm++, messageId);
- _conn.execUpdWait(updateStatusStatement(), 3);
- _conn.commit();
- }
+ private ResultSet getRowList() throws CourierException
+ {
+ if (null == _conn)
+ {
+ try
+ {
+ _conn = getConn();
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ }
+ while (_conn != null)
+ {
+ try
+ {
+ return _conn.execQueryWait(listStatement(), 3);
+ }
+ catch (Exception e)
+ {
+ _logger.debug("Problem encountered while executing query.", e);
+ e.printStackTrace();
+
+ jdbcConnectRetry(e);
+ }
+ }
+ return null;
- private ResultSet getRowList() throws CourierException {
- if (null == _conn) {
- try {
- _conn = getConn();
- }
- catch (Exception e) {
- throw new CourierException(e);
- }
- }
- while (_conn != null) {
- try {
- return _conn.execQueryWait(listStatement(), 3);
- }
- catch (Exception e) {
- jdbcConnectRetry(e);
- }
- }
- return null;
+ } // _______________________________
- } // _______________________________
+ private void jdbcConnectRetry(Exception exc)
+ {
+ _logger.debug("DB problem, will try to reconnect", exc);
+
+ cleanup();
+ _conn = null;
- private void jdbcConnectRetry(Exception exc) {
- _logger.debug("DB problem, will try to reconnect", exc);
- if (null != _conn)
- _conn.release();
- _conn = null;
+ _prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
+ for (int i1 = 0; i1 < 3; i1++)
+ {
+ try
+ {
+ _conn = getConn();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ Thread.sleep(_sleepForRetries);
+ }
+ catch (InterruptedException eInt)
+ {
+ return;
+ }
+ }
+ }
+ } // ________________________________
- _prepDelete = _prepGetList = _prepInsert = _prepSel4Upd = _prepUpdateStatus = null;
- for (int i1 = 0; i1 < 3; i1++) {
- try {
- _conn = getConn();
- }
- catch (Exception e) {
- try {
- Thread.sleep(_sleepForRetries);
- }
- catch (InterruptedException eInt) {
- return;
- }
- }
- }
- } // ________________________________
+ private JdbcCleanConn getConn() throws SQLException, MalformedEPRException, NamingException
+ {
+ if (null == _conn)
+ {
+ try
+ {
+ DataSource DS = null;
+ if (_epr.getDatasource() == null) {
+ DS = new SimpleDataSource(_epr.getDriver(),
+ _epr.getURL(), _epr.getUserName(), _epr.getPassword());
+ } else {
+ InitialContext initContext;
+ try {
+ initContext = new InitialContext();
+ DS = (DataSource) initContext.lookup(_epr.getDatasource());
+ } catch (NamingException e) {
+ _logger.error("Problem resolving DataSource through JNDI", e);
+
+ throw e; // it'll get wrapped later anyway!
+ }
+ }
+ _conn = new JdbcCleanConn(DS, transactional);
+ }
+ catch (URISyntaxException ex)
+ {
+ throw new MalformedEPRException(ex);
+ }
+ }
+ return _conn;
+ } // ________________________________
- private JdbcCleanConn getConn() throws SQLException, MalformedEPRException {
- if (null == _conn) {
- try {
- DataSource DS = null;
- if (_epr.getDatasource() == null) {
- DS = new SimpleDataSource(_epr.getDriver(),
- _epr.getURL(), _epr.getUserName(), _epr.getPassword());
- } else {
- InitialContext initContext;
- try {
- initContext = new InitialContext();
- DS = (DataSource) initContext.lookup(_epr.getDatasource());
- } catch (NamingException e) {
- _logger.error("", e);
- }
- }
- _conn = new JdbcCleanConn(DS);
- }
- catch (URISyntaxException ex) {
- throw new MalformedEPRException(ex);
- }
- }
- return _conn;
- } // ________________________________
-
protected PreparedStatement listStatement() throws SQLException
{
- if (null == _prepGetList)
+ if (null == _prepGetList)
+ {
+ try
+ {
+ String[] columns =
+ { _epr.getMessageIdColumn(), _epr.getTimestampColumn() };
- try {
- String[] columns =
- {_epr.getMessageIdColumn(), _epr.getTimestampColumn()};
-
StringBuilder sb = new StringBuilder("select");
int i1 = 0;
for (String col : columns)
@@ -379,15 +490,16 @@
}
catch (Exception e)
{
- _logger.warn("Unable to prepare SQL statement", e);
-
- throw new SQLException("Unable to prepare SQL statement: "+e);
+ e.printStackTrace();
+ _logger.debug("Unable to prepare SQL statement", e);
+ throw new SQLException("Problem encountered when trying to created PreparedStatement: "+e);
}
-
+ }
+
return _prepGetList;
} // ________________________________
- protected PreparedStatement select4UpdateStatement() throws SQLException
+ protected PreparedStatement select4UpdateStatement()
{
if (_prepSel4Upd == null)
{
@@ -397,46 +509,45 @@
* TODO make this dynamic using a factory pattern.
*/
- StringBuilder sb = null;
+ StringBuilder sb = null;
- if (!_epr.getURL().contains("hsqldb")) {
- sb = new StringBuilder("select ").append(
- _epr.getDataColumn()).append(" from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append("=?").append(
- " and ").append(_epr.getStatusColumn())
- .append("=?").append(" for update");
- } else {
- /*
- * HSQL does not support FOR UPDATE! All tables appear to
- * be inherently updatable!
- */
- sb = new StringBuilder("select ").append(
- _epr.getDataColumn()).append(" from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append("=?").append(
- " and ").append(_epr.getStatusColumn())
- .append("=?");
- }
+ if (!_epr.getURL().contains("hsqldb"))
+ {
+ sb = new StringBuilder("select ").append(
+ _epr.getDataColumn()).append(" from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append("=?").append(
+ " and ").append(_epr.getStatusColumn())
+ .append("=?").append(" for update");
+ }
+ else
+ {
+ /*
+ * HSQL does not support FOR UPDATE! All tables appear to
+ * be inherently updatable!
+ */
+
+ sb = new StringBuilder("select ").append(
+ _epr.getDataColumn()).append(" from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append("=?").append(
+ " and ").append(_epr.getStatusColumn())
+ .append("=?");
+ }
_prepSel4Upd = getConn().prepareStatement(sb.toString());
}
- catch (SQLException ex)
- {
- throw ex;
- }
catch (Exception e)
{
- _logger.warn(e);
-
- throw new SQLException("Caught exception during prepared statement: "+e);
+ _logger.debug(e);
+ return null;
}
}
-
+
return _prepSel4Upd;
- }
-
- protected PreparedStatement updateStatusStatement() throws SQLException
+ } // ________________________________
+
+ protected PreparedStatement updateStatusStatement()
{
if (null == _prepUpdateStatus)
try
@@ -447,25 +558,23 @@
.append(_epr.getMessageIdColumn()).append("=?");
_prepUpdateStatus = getConn().prepareStatement(sb.toString());
}
- catch (SQLException ex)
- {
- throw ex;
- }
catch (Exception e)
{
- _logger.warn(e);
-
- throw new SQLException("Caught exception during prepared statement: "+e);
+ _logger.debug(e);
+ return null;
}
return _prepUpdateStatus;
} // ________________________________
- protected PreparedStatement insertStatement() throws SQLException {
- if (null == _prepInsert)
- try {
- String[] columns =
- {_epr.getMessageIdColumn(), _epr.getDataColumn(),
- _epr.getStatusColumn(), _epr.getTimestampColumn()};
+ protected PreparedStatement insertStatement()
+ {
+ if (null == _prepInsert)
+ try
+ {
+ String[] columns =
+ { _epr.getMessageIdColumn(), _epr.getDataColumn(),
+ _epr.getStatusColumn(), _epr.getTimestampColumn() };
+
StringBuilder sb = new StringBuilder("insert into ").append(
_epr.getTableName()).append("(");
int i1 = 0;
@@ -474,20 +583,15 @@
sb.append(") values (?,?,?,?)");
_prepInsert = getConn().prepareStatement(sb.toString());
}
- catch (SQLException ex)
- {
- throw ex;
- }
catch (Exception e)
{
- _logger.warn(e);
-
- throw new SQLException("Caught exception during prepared statement: "+e);
+ _logger.debug(e);
+ return null;
}
return _prepInsert;
} // ________________________________
- protected PreparedStatement deleteStatement() throws SQLException
+ protected PreparedStatement deleteStatement()
{
if (null == _prepDelete)
try
@@ -497,49 +601,48 @@
_epr.getMessageIdColumn()).append(" =?");
_prepDelete = getConn().prepareStatement(sb.toString());
}
- catch (SQLException ex)
- {
- throw ex;
- }
catch (Exception e)
{
- _logger.warn(e);
-
- throw new SQLException("Caught exception during prepared statement: "+e);
+ _logger.debug(e);
+ return null;
}
return _prepDelete;
} // ________________________________
- protected enum State {
- Pending, WorkInProgress, Done, Error;
+ protected enum State
+ {
+ Pending, WorkInProgress, Done, Error;
+ String getColumnValue()
+ {
+ return toString().substring(0, 1);
+ }
+ }
- String getColumnValue() {
- return toString().substring(0, 1);
- }
- }
+ public void setPollLatency(Long millis)
+ {
+ if (millis <= 200)
+ _logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of "+_pollLatency);
+ else
+ _pollLatency = millis;
+ } // ________________________________
+
+ protected long _pollLatency = 200;
+ protected long _sleepForRetries = 3000; // milliseconds
- public void setPollLatency(Long millis) {
- if (millis <= 200)
- _logger.warn("Poll latency must be >= 200 milliseconds - Keeping old value of " + _pollLatency);
- else
- _pollLatency = millis;
- } // ________________________________
+ protected boolean _postDelete, _errorDelete;
+ protected boolean _isReceiver;
- protected long _pollLatency = 200;
- protected long _sleepForRetries = 3000; // milliseconds
+ protected JDBCEpr _epr;
- protected boolean _postDelete, _errorDelete;
- protected boolean _isReceiver;
+ protected JdbcCleanConn _conn;
- protected JDBCEpr _epr;
+ protected PreparedStatement _prepGetList;
+ protected PreparedStatement _prepSel4Upd;
+ protected PreparedStatement _prepUpdateStatus;
+ protected PreparedStatement _prepInsert;
+ protected PreparedStatement _prepDelete;
+
+ private boolean transactional = false;
- protected JdbcCleanConn _conn;
-
- protected PreparedStatement _prepGetList;
- protected PreparedStatement _prepSel4Upd;
- protected PreparedStatement _prepUpdateStatus;
- protected PreparedStatement _prepInsert;
- protected PreparedStatement _prepDelete;
-
- protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
-}
+ protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
+}
\ No newline at end of file
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -27,6 +27,7 @@
import java.net.URL;
import javax.transaction.Status;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.log4j.Logger;
@@ -207,5 +208,82 @@
throw new TransactionStrategyException("Failed to mark the transaction on current thread for rollback", th) ;
}
}
+
+ /**
+ * Get a handle on the currently associated transaction (or null).
+ * @throws TransactionStrategyException
+ */
+
+ public Object getTransaction () throws TransactionStrategyException
+ {
+ try
+ {
+ return tm.getTransaction();
+ }
+ catch (final Throwable th)
+ {
+ throw new TransactionStrategyException("Problem when trying to getTransaction: ",th);
+ }
+ }
+
+ /**
+ * Suspend the current thread-to-transaction association.
+ *
+ * @return the associated transaction, or null.
+ * @throws TransactionStrategyException
+ */
+ public Object suspend () throws TransactionStrategyException
+ {
+ try
+ {
+ return tm.suspend();
+ }
+ catch (final Throwable th)
+ {
+ throw new TransactionStrategyException("Problem when trying to suspend transaction: ",th);
+ }
+ }
+
+ /**
+ * Associated the transaction with the current thread.
+ * @param tx
+ * @throws TransactionStrategyException
+ */
+ public void resume (Object tx) throws TransactionStrategyException
+ {
+ try
+ {
+ tm.resume((Transaction) tx);
+ }
+ catch (final Throwable th)
+ {
+ throw new TransactionStrategyException("Problem when trying to resume transaction: ",th);
+ }
+ }
+
+ /**
+ * Is the currently associated transaction active?
+ * @return
+ * @throws TransactionStrategyException
+ */
+ public boolean isActive () throws TransactionStrategyException
+ {
+ try
+ {
+ if (tm.getStatus() == Status.STATUS_ACTIVE)
+ return true;
+ else
+ return false;
+ }
+ catch (final Throwable th)
+ {
+ throw new TransactionStrategyException("Problem when trying to get transaction status: ",th);
+ }
+ }
+
+ public String toString ()
+ {
+ return "JTATransactionStrategy";
+ }
}
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -91,6 +91,52 @@
throws TransactionStrategyException ;
/**
+ * Get a handle on the currently associated transaction (or null).
+ * @throws TransactionStrategyException
+ */
+
+ public abstract Object getTransaction () throws TransactionStrategyException;
+
+ /**
+ * Suspend the current thread-to-transaction association.
+ *
+ * @return the associated transaction, or null.
+ * @throws TransactionStrategyException
+ */
+ public abstract Object suspend () throws TransactionStrategyException;
+
+ /**
+ * Is the currently associated transaction active?
+ * @return
+ * @throws TransactionStrategyException
+ */
+ public abstract boolean isActive () throws TransactionStrategyException;
+
+ /**
+ * Associated the transaction with the current thread.
+ * @param tx
+ * @throws TransactionStrategyException
+ */
+ public abstract void resume (Object tx) throws TransactionStrategyException;
+
+ public static void setStrategy (TransactionStrategy txSt)
+ {
+ _currentStrategy.set(txSt);
+ }
+
+ public static TransactionStrategy getStrategy ()
+ {
+ return _currentStrategy.get();
+ }
+
+ public static void removeStrategy ()
+ {
+ _currentStrategy.remove();
+ }
+
+ private final static ThreadLocal<TransactionStrategy> _currentStrategy = new ThreadLocal<TransactionStrategy>();
+
+ /**
* The null transaction strategy.
* @author kevin
*/
@@ -124,5 +170,50 @@
throws TransactionStrategyException
{
}
+
+ /**
+ * Get a handle on the currently associated transaction (or null).
+ * @throws TransactionStrategyException
+ */
+
+ public Object getTransaction () throws TransactionStrategyException
+ {
+ return null;
+ }
+
+ /**
+ * Suspend the current thread-to-transaction association.
+ *
+ * @return the associated transaction, or null.
+ * @throws TransactionStrategyException
+ */
+ public Object suspend () throws TransactionStrategyException
+ {
+ return null;
+ }
+
+ /**
+ * Associated the transaction with the current thread.
+ * @param tx
+ * @throws TransactionStrategyException
+ */
+ public void resume (Object tx) throws TransactionStrategyException
+ {
+ }
+
+ /**
+ * Is the currently associated transaction active?
+ * @return
+ * @throws TransactionStrategyException
+ */
+ public boolean isActive () throws TransactionStrategyException
+ {
+ return false;
+ }
+
+ public String toString ()
+ {
+ return "NullTransactionStrategy";
+ }
}
}
\ No newline at end of file
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -38,6 +38,8 @@
private DataSource m_oDS = null;
private Connection m_conn = null;
+
+ private boolean transactional = false;
protected List<PreparedStatement> m_olPrepSt = new ArrayList<PreparedStatement>();
@@ -45,13 +47,19 @@
public JdbcCleanConn(DataSource p_oDS)
{
+ this(p_oDS, false);
+ }
+
+ public JdbcCleanConn(DataSource p_oDS, boolean transactional)
+ {
m_oDS = p_oDS;
m_oLogger = Logger.getLogger(this.getClass());
+ this.transactional = transactional;
}
public void commit() throws SQLException
{
- if (null != m_conn)
+ if ((null != m_conn) && (!transactional))
{
m_conn.commit();
}
@@ -59,7 +67,7 @@
public void rollback() throws SQLException
{
- if (null != m_conn)
+ if ((null != m_conn) && (!transactional))
{
m_conn.rollback();
} else {
@@ -73,7 +81,7 @@
public void release()
{
- if (null != m_conn)
+ if ((null != m_conn) && (!transactional))
{
try
{
@@ -135,6 +143,9 @@
connect();
}
+ if (p_PS == null)
+ throw new SQLException("Null PreparedStatement!");
+
SQLException eRet = null;
int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
for (int i1 = 0; i1 < iQtry; i1++)
@@ -172,6 +183,9 @@
connect();
}
+ if (p_PS == null)
+ throw new SQLException("Null PreparedStatement!");
+
SQLException eRet = null;
int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
for (int i1 = 0; i1 < iQtry; i1++)
@@ -250,8 +264,11 @@
throw new RuntimeException("connect() FAILED: no connection");
}
- m_conn.setAutoCommit(false);
- m_conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ if (!transactional)
+ {
+ m_conn.setAutoCommit(false);
+ m_conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ }
m_olPrepSt.clear();
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -57,6 +57,7 @@
/** Listeners */
public static final String LISTENER_CLASS_TAG = "listenerClass";
public static final String TRANSACTED_TAG = "transacted";
+ public static final String ROLLBACK_ON_PIPELINE_FAULTS = "rollbackOnPipelineFaults";
/** Deployment */
public static final String DEPLOYMENT_NAME_TAG = "deployment";
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -271,9 +271,18 @@
boolean bErrorDel = Boolean.valueOf(tree.getAttribute(
JDBCEpr.ERROR_DEL_TAG, "true"));
JDBCEpr epr = new JDBCEpr(url, bPostDel, bErrorDel);
- epr.setDriver(tree.getRequiredAttribute(JDBCEpr.DRIVER_TAG));
- epr.setUserName(getAttrAndWarn(tree, JDBCEpr.USERNAME_TAG, ""));
- epr.setPassword(getAttrAndWarn(tree, JDBCEpr.PASSWORD_TAG, ""));
+
+ String datasource = tree.getAttribute(JDBCEpr.DATASOURCE_TAG);
+
+ if (datasource == null)
+ {
+ epr.setDriver(tree.getRequiredAttribute(JDBCEpr.DRIVER_TAG));
+ epr.setUserName(getAttrAndWarn(tree, JDBCEpr.USERNAME_TAG, ""));
+ epr.setPassword(getAttrAndWarn(tree, JDBCEpr.PASSWORD_TAG, ""));
+ }
+ else
+ epr.setDatasource(datasource);
+
epr.setTableName(tree.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG));
epr.setMessageIdColumn(getColName(tree, JDBCEpr.MESSAGE_ID_COLUMN_TAG));
epr.setStatusColumn(getColName(tree, JDBCEpr.STATUS_COLUMN_TAG));
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -65,10 +65,9 @@
}
if (provider.getDatasource() != null) {
- if ((provider.getUrl() != null)
- || (provider.getUsername() != null)
+ if ((provider.getUsername() != null)
|| (provider.getDriver() != null)) {
- throw new ConfigurationException ("Invalid sql-provider configuration : a datasource and a URL/username/password/driver "
+ throw new ConfigurationException ("Invalid sql-provider configuration : a datasource and a username/password/driver "
+ "combination cannot both be defined. Use only one (datasource or JDBC connection info)."
+ "Datasource : [" + provider.getDatasource() + "] JDBC URL [" + provider.getUrl() + "]");
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -32,6 +32,8 @@
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
@@ -85,6 +87,10 @@
* The error delay.
*/
private long errorDelay ;
+
+ private TransactionStrategy transactionStrategy;
+ private boolean transactional = false;
+ private boolean rollbackOnPipelineFaults = true;
/**
* public constructor
@@ -156,6 +162,11 @@
}
}
_latencySecs = lSeconds ;
+
+ transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+ transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
+
+ rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
}
/**
@@ -261,17 +272,53 @@
}
}
+ /**
+ * We have JMS transactional delivery/work semantics: before pulling a unit of work
+ * we start a transaction. If the pipeline completes successfully then we will
+ * commit that transaction and the OUW will be deleted. If we have to roll back
+ * the transaction then the UOW will be placed back on the input "queue" (assumes that
+ * the courier is transactional).
+ *
+ * @param maxWaitMillis
+ */
public void waitForEventAndProcess (long maxWaitMillis)
{
Message message = null ;
+ boolean problem = false;
+
try
{
+ transactionStrategy.begin();
+
+ /*
+ * If this is a transactional receive then the courier
+ * needs to be reset afterwards, because we can only
+ * guarantee one instance per transaction. If the courier
+ * instance does some handy multiplexing internally across different
+ * transactions then we won't be able to handle that at this level.
+ * However, at the moment that isn't an issue.
+ */
+
+ TransactionStrategy.setStrategy(transactionStrategy);
+
message = (maxWaitMillis > 0) ? _pickUpCourier
.pickup(maxWaitMillis) : null;
errorDelay = 0 ;
}
+ catch (TransactionStrategyException ex)
+ {
+ // could not begin transaction!
+
+ _logger.error("Could not begin transaction!");
+
+ problem = true;
+
+ return;
+ }
catch (CourierTimeoutException e)
{
+ problem = true;
+
return;
}
catch (FaultMessageException fme)
@@ -280,35 +327,79 @@
}
catch (CourierException e)
{
- _logger.debug("Courier Exception", e);
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY ;
- }
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1 ;
- }
- _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+ _logger.debug("Courier Exception", e);
+ if (errorDelay == 0)
+ {
+ errorDelay = MIN_ERROR_DELAY ;
+ }
+ else if (errorDelay < MAX_ERROR_DELAY)
+ {
+ errorDelay <<= 1 ;
+ }
+ e.printStackTrace();
+ _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
+
+ problem = true;
+
return;
}
+ finally
+ {
+ if (problem)
+ {
+ try
+ {
+ if (transactionStrategy.getTransaction() != null)
+ {
+ CourierUtil.cleanCourier(_pickUpCourier);
+
+ resetCourier();
+ }
+ }
+ catch (Throwable ex)
+ {
+ CourierUtil.cleanCourier(_pickUpCourier);
+
+ resetCourier(); // to be on the safe side!
+ }
+
+ rollbackTransaction();
+ }
+
+ TransactionStrategy.removeStrategy();
+ }
if (null != message)
{
- final Message pipelineMessage = message ;
- final Runnable pipelineRunner = new Runnable() {
- public void run() {
- try {
- pipeline.process(pipelineMessage) ;
- } finally {
- updateThreadCount(-1) ;
- }
- }
- } ;
- updateThreadCount(+1);
- _execService.execute(pipelineRunner);
+ try
+ {
+ final Message pipelineMessage = message ;
+ final Object txHandle = transactionStrategy.suspend();
+ final TransactionalRunner txRunner = new TransactionalRunner(_pickUpCourier, pipelineMessage, txHandle);
+
+ updateThreadCount(+1);
+ _execService.execute(txRunner);
+
+ if (transactional)
+ {
+ _pickUpCourier = null; // runner will clean it up.
+
+ resetCourier(); // we need another courier for the next msg.
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ _logger.warn("Caught transaction related exception: ", ex);
+ rollbackTransaction();
+ }
}
+ else
+ {
+ // nothing to do, so roll back the transaction before returning.
+
+ rollbackTransaction();
+ }
} // ________________________________
@@ -392,7 +483,134 @@
}
}
}
+
+ private void rollbackTransaction ()
+ {
+ try
+ {
+ transactionStrategy.rollbackOnly();
+ transactionStrategy.terminate();
+ }
+ catch (Throwable ex)
+ {
+ _logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
+ }
+ }
+
+ private void resetCourier ()
+ {
+ TwoWayCourier pickUpCourier = null;
+ try
+ {
+ pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
+ try
+ {
+ final Method setPollLatency = pickUpCourier.getClass().getMethod(
+ "setPollLatency", new Class[] { Long.class });
+ setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+ }
+ catch (final NoSuchMethodException nsme)
+ {
+ // OK, just leave it null
+ }
+ catch (final Exception ex)
+ {
+ CourierUtil.cleanCourier(pickUpCourier);
+
+ _logger.error("Problems invoking setPollLatency(long)", ex);
+ }
+ }
+ catch (final MalformedEPRException mepre)
+ {
+ _logger.error("Malformed EPR: " + _epr) ;
+ }
+ catch (final CourierException ce)
+ {
+ _logger.error("No appropriate courier can be obtained for " + _epr, ce);
+ }
+
+ _pickUpCourier = pickUpCourier;
+ }
+
+ class TransactionalRunner implements Runnable
+ {
+ public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
+ {
+ _courier = courier;
+ _pipelineMessage = pipelineMessage;
+ _txHandle = txHandle;
+ }
+
+ public void run()
+ {
+ boolean problem = false;
+
+ try
+ {
+ transactionStrategy.resume(_txHandle);
+
+ pipeline.setTransactional(transactional);
+
+ TransactionStrategy.setStrategy(transactionStrategy);
+
+ /*
+ * Current strategy is to commit as long as process returns true.
+ * If fails, or any exceptions are caught, then we roll back.
+ *
+ * TODO re-examine the semantics around true/false from the pipeline.
+ */
+
+ // TODO consider adding a RollbackOnFalse option to allow override.
+
+ problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
+
+ if (!problem)
+ {
+ transactionStrategy.terminate();
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ problem = true;
+
+ _logger.warn("TransactionalRunner caught transaction exception: ", ex);
+ }
+ catch (RuntimeException ex)
+ {
+ problem = true;
+
+ throw ex;
+ }
+ catch (Throwable ex)
+ {
+ problem = true;
+
+ _logger.warn("TransactionalRunner caught throwable: ",ex);
+ }
+ finally
+ {
+ if (problem)
+ {
+ rollbackTransaction();
+ }
+
+ if (transactional)
+ {
+ CourierUtil.cleanCourier(_courier);
+ }
+
+ TransactionStrategy.removeStrategy();
+
+ updateThreadCount(-1);
+ }
+ }
+
+ private PickUpOnlyCourier _courier;
+ private Message _pipelineMessage;
+ private Object _txHandle;
+ };
+
private ConfigTree _config;
private String _eprCategoryName;
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -26,6 +26,8 @@
import org.apache.log4j.Logger;
import org.hibernate.Session;
import org.hibernate.Transaction;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.monitoring.MonitoringSessionFactory;
import org.jboss.soa.esb.monitoring.StatisticsBean;
import org.jboss.soa.esb.monitoring.StatisticsData;
@@ -160,6 +162,21 @@
* @param f_sb statistics bean
*/
public void insertStatistics(StatisticsBean f_sb) {
+ TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ Object txHandle = null;
+
+ if (txStrategy != null)
+ {
+ try
+ {
+ txHandle = txStrategy.suspend();
+ }
+ catch (TransactionStrategyException ex)
+ {
+ // if it failed, then the next work will fail too.
+ }
+ }
+
Session sess = null;
Transaction tx = null;
try {
@@ -187,6 +204,18 @@
sess.flush();
sess.close();
sess = null;
+
+ if (txHandle != null)
+ {
+ try
+ {
+ txStrategy.resume(txHandle);
+ }
+ catch (TransactionStrategyException ex)
+ {
+ logger.error("Problem resuming transaction!", ex);
+ }
+ }
}
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -25,6 +25,8 @@
import org.apache.log4j.Logger;
import org.hibernate.Session;
import org.hibernate.Transaction;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.monitoring.MonitoringSessionFactory;
import org.jboss.soa.esb.monitoring.pojo.JMXOperation;
import org.jboss.soa.esb.monitoring.pojo.JMXOperationResult;
@@ -65,6 +67,20 @@
* @see org.jboss.soa.esb.monitoring.server.Filer#persistData()
*/
public void persistData() {
+ TransactionStrategy txS = TransactionStrategy.getStrategy();
+ Object txHandle = null;
+
+ if (txS != null)
+ {
+ try
+ {
+ txHandle = txS.suspend();
+ }
+ catch (TransactionStrategyException ex)
+ {
+ }
+ }
+
Session sess = null;
Transaction tx = null;
try {
@@ -86,6 +102,18 @@
sess.flush();
sess.close();
sess = null;
+
+ if (txHandle != null)
+ {
+ try
+ {
+ txS.resume(txHandle);
+ }
+ catch (TransactionStrategyException ex)
+ {
+ logger.error("Problem resuming transaction!", ex);
+ }
+ }
}
}
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java 2007-12-29 01:36:35 UTC (rev 17425)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java 2007-12-29 01:37:43 UTC (rev 17426)
@@ -28,6 +28,8 @@
import org.hibernate.Query;
import org.hibernate.Session;
import org.hibernate.Transaction;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.message.body.content.ServiceControlCommand;
import org.jboss.soa.esb.monitoring.MonitoringSessionFactory;
import org.jboss.soa.esb.monitoring.OperationsData;
@@ -93,6 +95,20 @@
public void insertOperations(ServiceControlCommand f_ob) {
Session sess = null;
Transaction tx = null;
+ TransactionStrategy txS = TransactionStrategy.getStrategy();
+ Object txHandle = null;
+
+ if (txS != null)
+ {
+ try
+ {
+ txHandle = txS.suspend();
+ }
+ catch (TransactionStrategyException ex)
+ {
+ }
+ }
+
try {
sess = (Session) MonitoringSessionFactory.getInstance().openSession();
tx = sess.beginTransaction();
@@ -119,6 +135,18 @@
sess.flush();
sess.close();
sess = null;
+
+ if (txHandle != null)
+ {
+ try
+ {
+ txS.resume(txHandle);
+ }
+ catch (TransactionStrategyException ex)
+ {
+ logger.error("Problem resuming transaction!", ex);
+ }
+ }
}
}
@@ -130,6 +158,20 @@
public void updateActiveFlag(String serverName) {
Session sess = null;
Transaction tx = null;
+ TransactionStrategy txS = TransactionStrategy.getStrategy();
+ Object txHandle = null;
+
+ if (txS != null)
+ {
+ try
+ {
+ txHandle = txS.suspend();
+ }
+ catch (TransactionStrategyException ex)
+ {
+ }
+ }
+
try {
sess = (Session) MonitoringSessionFactory.getInstance().openSession();
tx = sess.beginTransaction();
@@ -151,6 +193,18 @@
sess.flush();
sess.close();
sess = null;
+
+ if (txHandle != null)
+ {
+ try
+ {
+ txS.resume(txHandle);
+ }
+ catch (TransactionStrategyException ex)
+ {
+ logger.error("Problem resuming transaction!", ex);
+ }
+ }
}
}
More information about the jboss-svn-commits
mailing list