[jboss-svn-commits] JBL Code SVN: r16982 - in labs/jbossesb/workspace/SQLCourier_XA/product: rosetta/src/org/jboss/internal/soa/esb/message and 17 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Dec 3 06:08:10 EST 2007
Author: tfennelly
Date: 2007-12-03 06:08:09 -0500 (Mon, 03 Dec 2007)
New Revision: 16982
Added:
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlCourierRollbackTerminateStrategy.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TransactedPickUpOnlyCourier.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/RollbackStrategy.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TerminateStrategy.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TransactedMessageDeliveryContext.java
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/build.xml
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jboss-esb.xml
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-properties.xml
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-service.xml
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jndi.properties
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/juddi.properties
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/lib/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/listener.log
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/log4j.xml
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/populate.sql
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/quickstart-ds.xml
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/readme.txt
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/hsqldb/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/hsqldb/create.sql
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/helloworldsqlaction/
labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/helloworldsqlaction/MyAction.java
Modified:
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessorMethodInfo.java
labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Changes made around a failed attempt to sort out http://jira.jboss.com/jira/browse/JBESB-1283
Added: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlCourierRollbackTerminateStrategy.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlCourierRollbackTerminateStrategy.java (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlCourierRollbackTerminateStrategy.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,75 @@
+/*
+ Milyn - Copyright (C) 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License (version 2.1) as published by the Free Software
+ Foundation.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+ See the GNU Lesser General Public License for more details:
+ http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.internal.soa.esb.couriers;
+
+import org.jboss.internal.soa.esb.message.tx.RollbackStrategy;
+import org.jboss.internal.soa.esb.message.tx.TerminateStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.couriers.CourierException;
+
+import java.sql.SQLException;
+
+/**
+ * SQLTableCourier rollback strategy.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class SqlCourierRollbackTerminateStrategy implements RollbackStrategy, TerminateStrategy {
+
+ private SqlTableCourier courier;
+ private SqlTableCourier.CourierMessageRecord messageRecord;
+ private boolean postDelete;
+
+ public SqlCourierRollbackTerminateStrategy(SqlTableCourier courier, SqlTableCourier.CourierMessageRecord messageRecord, boolean postDelete) {
+ this.courier = courier;
+ this.messageRecord = messageRecord;
+ this.postDelete = postDelete;
+ }
+
+ public void executeRollbackStrategy() throws TransactionStrategyException {
+ if(postDelete) {
+ try {
+ courier.insertRecord(messageRecord, false);
+ } catch (CourierException e) {
+ throw new TransactionStrategyException("Failed to reinsert message.", e);
+ }
+ } else {
+ try {
+ courier.resetStatus(messageRecord);
+ } catch (SQLException e) {
+ throw new TransactionStrategyException("Failed to reset message status.", e);
+ }
+ }
+ }
+
+ public boolean executeRollback() {
+ // We're not actually going to do a rollback because we're not rolling
+ // anything back... instead, the SQLCourier compensates through reinserting
+ // the message, or reseting its status as above.
+ return false;
+ }
+
+ public void executeTerminateStrategy() throws TransactionStrategyException {
+ /*
+ try {
+ courier._conn.commit();
+ } catch (SQLException e) {
+ throw new TransactionStrategyException("Failed to commit transaction.", e);
+ } finally {
+ }
+ */
+ courier.inProcessMessages.remove(messageRecord.getMessageId());
+ }
+}
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlCourierRollbackTerminateStrategy.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -23,8 +23,8 @@
package org.jboss.internal.soa.esb.couriers;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.message.tx.TransactedMessageDeliveryContext;
import org.jboss.soa.esb.addressing.Call;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
@@ -42,12 +42,27 @@
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
-public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier {
+public class SqlTableCourier implements TransactedPickUpOnlyCourier, DeliverOnlyCourier {
+
+ // TODO: May not need inProcessMessages... was added as a check to see would the BG thread pick up on the same
+ // rows again, while those rows are being processed and before they're deleted via the transaction
+ // commit. I suspect JTA makes sure this doesn't happen...
+ protected Set<String> inProcessMessages = Collections.synchronizedSet(new HashSet<String>());
+
+ private boolean isTransacted = false;
+ private static final String RETRYTIME = "retrytime";
+ private static final int RETRY_DELAY = 10000;
+
/**
* disable default constructor
*/
@@ -136,17 +151,45 @@
}
}
+ Serializable serialisedMessage = null;
+ try {
+ serialisedMessage = Util.serialize(message);
+ } catch (Exception e) {
+ throw new CourierException("Failed to serialize message for storage in database table.", e);
+ }
+
+ CourierMessageRecord messageRecord = new CourierMessageRecord();
+ messageRecord.messageId = msgId;
+ messageRecord.message = serialisedMessage;
+ messageRecord.state = State.Pending;
+ messageRecord.timestamp = System.currentTimeMillis();
+ messageRecord.retryTime = messageRecord.timestamp;
+
+ return insertRecord(messageRecord, true);
+ }
+
+ protected boolean insertRecord(CourierMessageRecord messageRecord, boolean commit) throws CourierException {
while (_conn != null) {
try {
- int iCol = 1;
PreparedStatement PS = insertStatement();
- PS.setString(iCol++, msgId);
- PS.setObject(iCol++, Util.serialize(message));
- PS.setString(iCol++, State.Pending.getColumnValue());
- PS.setLong(iCol++, System.currentTimeMillis());
+ PS.setString(1, messageRecord.messageId);
+ PS.setObject(2, messageRecord.message);
+ PS.setString(3, messageRecord.state.toColumnValue());
+ PS.setLong(4, messageRecord.timestamp); // message timestamp
+
+ if(messageRecord.retryTime == 0) {
+ messageRecord.retryTime = System.currentTimeMillis() + RETRY_DELAY;
+ }
+
+ PS.setLong(5, messageRecord.retryTime); // retrytime
+
_conn.execUpdWait(PS, 3);
- _conn.commit();
+
+ if(commit) {
+ _conn.commit();
+ }
+
return true;
}
catch (SQLException e) {
@@ -166,11 +209,15 @@
jdbcConnectRetry(e);
}
}
+
return false;
- } // ________________________________
+ }
public Message pickup(long millis) throws CourierException, CourierTimeoutException {
- Message result = null;
+ throw new UnsupportedOperationException("Invalid call to untransacted pickup() method.");
+ }
+
+ public Message pickup(long millis, TransactedMessageDeliveryContext context) throws CourierException, CourierTimeoutException {
long limit = System.currentTimeMillis()
+ ((millis < 100) ? 100 : millis);
do {
@@ -178,23 +225,31 @@
ResultSet RS = getRowList();
while (null != RS && RS.next()) {
String messageId = RS.getString(1);
- if (null == (result = tryToPickup(messageId)))
+
+ // TODO: I don't actually think this is needed - the TM should make sure this can't happen (I hope:-) )
+ if(inProcessMessages.contains(messageId)) {
continue;
+ }
+ Message message = tryToPickup(messageId, context);
+ if (message == null) {
+ continue;
+ }
+ inProcessMessages.add(messageId);
+
/*
* If this is fault message, then throw an exception with the contents. With the
* exception of user-defined exceptions, faults will have nothing in the body, properties etc.
*/
- if (Type.isFaultMessage(result))
- Factory.createExceptionFromFault(result);
+ if (Type.isFaultMessage(message))
+ Factory.createExceptionFromFault(message);
- return result;
+ return message;
}
}
catch (SQLException e) {
_logger.debug("SQL Exception during pickup", e);
- return null;
}
finally {
// Added to make sure we release transactions from all paths
@@ -214,55 +269,70 @@
Thread.sleep(lSleep);
}
catch (InterruptedException e) {
- return null;
}
} while (System.currentTimeMillis() <= limit);
+
return null;
- } // ________________________________
+ }
- private Message tryToPickup(String messageId) throws CourierException,
- SQLException {
- int iParm = 1;
+ private Message tryToPickup(String messageId, TransactedMessageDeliveryContext context) throws CourierException, SQLException {
+ select4UpdateStatement().setString(1, messageId);
+ select4UpdateStatement().setString(2, State.Pending.toColumnValue());
+ select4UpdateStatement().setLong( 3, System.currentTimeMillis());
- select4UpdateStatement().setString(iParm++, messageId);
- select4UpdateStatement().setString(iParm++,
- State.Pending.getColumnValue());
+ if (_conn != null) {
+ try {
+ context.beginTransaction();
+ ResultSet resultSet = _conn.execQueryWait(select4UpdateStatement(), 3);
+ try {
+ while (resultSet.next()) {
+ Exception eBad = null;
+ CourierMessageRecord messageRecord = new CourierMessageRecord();
- while (_conn != null) {
- try {
- ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
- while (RS.next()) {
- Exception eBad = null;
- try {
- Message result = Util.deserialize((Serializable) RS
- .getObject(1));
- if (_postDelete)
- deleteMsg(messageId);
- else
- changeStatus(messageId, State.Done);
- return result;
+ messageRecord.messageId = messageId;
+ messageRecord.message = (Serializable) resultSet.getObject(1);
+ messageRecord.state = State.toState(resultSet.getString(2));
+ messageRecord.timestamp = resultSet.getLong(3);
+
+ try {
+ Message result = Util.deserialize((Serializable) resultSet.getObject(1));
+
+ SqlCourierRollbackTerminateStrategy rollbackTerminateStrategy = new SqlCourierRollbackTerminateStrategy(this, messageRecord, _postDelete);
+ context.setRollbackStrategy(rollbackTerminateStrategy);
+ context.setTerminateStrategy(rollbackTerminateStrategy);
+
+ if (_postDelete) {
+ deleteMsg(messageId);
+ } else {
+ changeStatus(messageId, State.Done);
+ }
+
+ // Transaction must be terminated by caller (MessageAwareListener)...
+ return result;
+ }
+ catch (ClassCastException e) {
+ eBad = e;
+ }
+ catch (SAXParseException e) {
+ eBad = e;
+ }
+ catch (Exception e) {
+ throw new CourierException(e);
+ }
+ if (null != eBad) {
+ if (_errorDelete)
+ deleteMsg(messageId);
+ else
+ changeStatus(messageId, State.Error);
+ continue;
+ }
}
- catch (ClassCastException e) {
- eBad = e;
- }
- catch (SAXParseException e) {
- eBad = e;
- }
- catch (Exception e) {
- throw new CourierException(e);
- }
- if (null != eBad) {
- if (_errorDelete)
- deleteMsg(messageId);
- else
- changeStatus(messageId, State.Error);
- continue;
- }
+ } finally {
+ resultSet.close();
}
- return null;
}
catch (SQLException e) {
- throw new CourierException(e);
+ throw new CourierException("", e);
}
catch (Exception e) {
jdbcConnectRetry(e);
@@ -272,22 +342,47 @@
} // ________________________________
private void deleteMsg(String messageId) throws SQLException {
- int iParm = 1;
- deleteStatement().setString(iParm++, messageId);
- _conn.execUpdWait(deleteStatement(), 3);
- _conn.commit();
+ PreparedStatement deleteStatement = deleteStatement();
+ synchronized (deleteStatement) {
+ deleteStatement.setString(1, messageId);
+ int updateCount = _conn.execUpdWait(deleteStatement, 3);
+ if(_logger.isDebugEnabled()) {
+ _logger.debug("Delete update on message '" + messageId + "' updated " + updateCount + " records.");
+ }
+ }
+
+ if(!isTransacted) {
+ _conn.commit();
+ }
}
private void changeStatus(String messageId, State to) throws SQLException {
- int iParm = 1;
- updateStatusStatement().setString(iParm++, to.getColumnValue());
- updateStatusStatement().setString(iParm++, messageId);
- _conn.execUpdWait(updateStatusStatement(), 3);
- _conn.commit();
+ PreparedStatement updateStatement = updateStatusStatement();
+ synchronized (updateStatement) {
+ updateStatement.setString(1, to.toColumnValue());
+ if(to == State.Pending) {
+ updateStatement.setLong(2, System.currentTimeMillis() + RETRY_DELAY);
+ } else {
+ updateStatement.setLong(2, System.currentTimeMillis());
+ }
+ updateStatement.setString(3, messageId);
+ int updateCount = _conn.execUpdWait(updateStatement, 3);
+ if(_logger.isDebugEnabled()) {
+ _logger.debug("ChangeStatus update on message '" + messageId + "' updated " + updateCount + " records.");
+ }
+ }
+
+ if(!isTransacted) {
+ _conn.commit();
+ }
}
+ public void resetStatus(CourierMessageRecord messageRecord) throws SQLException {
+ changeStatus(messageRecord.messageId, messageRecord.state);
+ }
+
private ResultSet getRowList() throws CourierException {
if (null == _conn) {
try {
@@ -331,27 +426,40 @@
}
} // ________________________________
- private JdbcCleanConn getConn() throws SQLException, MalformedEPRException {
+ private JdbcCleanConn getConn() throws SQLException {
if (null == _conn) {
- try {
- DataSource DS = null;
- if (_epr.getDatasource() == null) {
- DS = new SimpleDataSource(_epr.getDriver(),
- _epr.getURL(), _epr.getUserName(), _epr.getPassword());
+ DataSource DS = null;
+
+ if (_epr.getDatasource() == null) {
+ try {
+ DS = new SimpleDataSource(_epr.getDriver(), _epr.getURL(), _epr.getUserName(), _epr.getPassword());
+ isTransacted = false;
+ _logger.warn("SQL provider config for provider implementation '" + _epr.getDriver() + "' is not based on a Datasource. Transactions will not be supported.");
+ } catch (URISyntaxException ex) {
+ throw new RuntimeException("Unexpected exception out of JDBCEpr getter methods.", ex);
+ }
+ } else {
+ InitialContext initContext;
+ try {
+ initContext = new InitialContext();
+ DS = (DataSource) initContext.lookup(_epr.getDatasource());
+ } catch (NamingException e) {
+ SQLException sqlE = new SQLException("Datasource '" + _epr.getDatasource() + "' lookup failure.");
+ sqlE.initCause(e);
+ throw sqlE;
+ }
+ }
+ _conn = new JdbcCleanConn(DS);
+ if(!(DS instanceof SimpleDataSource)) {
+ DatabaseMetaData dbMetaData = _conn.getConnection().getMetaData();
+ if(!dbMetaData.supportsTransactions()) {
+ isTransacted = false;
+ _logger.warn("The configured Datasource '" + _epr.getDatasource() + "' Provider implementation '" + dbMetaData.getDatabaseProductName() + " (" + dbMetaData.getDriverName() + " - " + dbMetaData.getDriverVersion() + ")' does not support transactions.");
} else {
- InitialContext initContext;
- try {
- initContext = new InitialContext();
- DS = (DataSource) initContext.lookup(_epr.getDatasource());
- } catch (NamingException e) {
- _logger.error("", e);
- }
+ isTransacted = true;
+ _logger.info("The configured Datasource '" + _epr.getDatasource() + "' Provider implementation '" + dbMetaData.getDatabaseProductName() + " (" + dbMetaData.getDriverName() + " - " + dbMetaData.getDriverVersion() + ")' supports transactions.");
}
- _conn = new JdbcCleanConn(DS);
}
- catch (URISyntaxException ex) {
- throw new MalformedEPRException(ex);
- }
}
return _conn;
} // ________________________________
@@ -369,7 +477,7 @@
sb.append((i1++ < 1) ? " " : ",").append(col);
sb.append(" from ").append(_epr.getTableName());
sb.append(" where ").append(_epr.getStatusColumn())
- .append("='").append(State.Pending.getColumnValue())
+ .append("='").append(State.Pending.toColumnValue())
.append("'").append(" order by 2");
_prepGetList = getConn().prepareStatement(sb.toString());
}
@@ -380,45 +488,51 @@
return _prepGetList;
} // ________________________________
- protected PreparedStatement select4UpdateStatement() {
+ protected PreparedStatement select4UpdateStatement() throws CourierException, SQLException {
if (_prepSel4Upd == null) {
+ JdbcCleanConn cleanConnection = getConn();
+ StringBuilder sb = null;
+
try {
- /*
- * TODO make this dynamic using a factory pattern.
- */
+ sb = new StringBuilder("select ").append(
+ _epr.getDataColumn()).append(", ").append(
+ _epr.getStatusColumn()).append(", ").append(
+ _epr.getTimestampColumn()).append(
+ " from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append("= ?").append(
+ " and ").append(_epr.getStatusColumn()).append("= ?").append(
+ " and ").append(" " + RETRYTIME + " < ?");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unexpected exception out of JDBCEpr getter methods.", e);
+ }
- StringBuilder sb = null;
+ try {
+ if(cleanConnection.getConnection().getMetaData().supportsSelectForUpdate()) {
+ sb.append(" for update");
+ }
+ _prepSel4Upd = cleanConnection.prepareStatement(sb.toString());
+ } catch (SQLException e) {
+ throw new CourierException("Unable to create prepared statement.", e);
+ }
+ }
- if (!_epr.getURL().contains("hsqldb")) {
- sb = new StringBuilder("select ").append(
- _epr.getDataColumn()).append(" from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append("=?").append(
- " and ").append(_epr.getStatusColumn())
- .append("=?").append(" for update");
- } else {
- /*
- * HSQL does not support FOR UPDATE! All tables appear to
- * be inherently updatable!
- */
+ return _prepSel4Upd;
+ } // ________________________________
- sb = new StringBuilder("select ").append(
- _epr.getDataColumn()).append(" from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append("=?").append(
- " and ").append(_epr.getStatusColumn())
- .append("=?");
- }
-
- _prepSel4Upd = getConn().prepareStatement(sb.toString());
+ protected PreparedStatement deleteStatement() {
+ if (null == _prepDelete)
+ try {
+ StringBuilder sb = new StringBuilder("delete from ").append(
+ _epr.getTableName()).append(" where ").append(
+ _epr.getMessageIdColumn()).append(" = ?");
+ _prepDelete = getConn().prepareStatement(sb.toString());
}
catch (Exception e) {
_logger.debug(e);
return null;
}
- }
-
- return _prepSel4Upd;
+ return _prepDelete;
} // ________________________________
protected PreparedStatement updateStatusStatement() {
@@ -426,7 +540,8 @@
try {
StringBuilder sb = new StringBuilder("update ").append(
_epr.getTableName()).append(" set ").append(
- _epr.getStatusColumn()).append("= ?").append(" where ")
+ _epr.getStatusColumn()).append("= ?, ").append(
+ RETRYTIME).append("= ?").append(" where ")
.append(_epr.getMessageIdColumn()).append("=?");
_prepUpdateStatus = getConn().prepareStatement(sb.toString());
}
@@ -442,14 +557,14 @@
try {
String[] columns =
{_epr.getMessageIdColumn(), _epr.getDataColumn(),
- _epr.getStatusColumn(), _epr.getTimestampColumn()};
+ _epr.getStatusColumn(), _epr.getTimestampColumn(), RETRYTIME};
StringBuilder sb = new StringBuilder("insert into ").append(
_epr.getTableName()).append("(");
int i1 = 0;
for (String col : columns)
sb.append((i1++ < 1) ? " " : ",").append(col);
- sb.append(") values (?,?,?,?)");
+ sb.append(") values (?,?,?,?,?)");
_prepInsert = getConn().prepareStatement(sb.toString());
}
catch (Exception e) {
@@ -459,27 +574,26 @@
return _prepInsert;
} // ________________________________
- protected PreparedStatement deleteStatement() {
- if (null == _prepDelete)
- try {
- StringBuilder sb = new StringBuilder("delete from ").append(
- _epr.getTableName()).append(" where ").append(
- _epr.getMessageIdColumn()).append(" =?");
- _prepDelete = getConn().prepareStatement(sb.toString());
- }
- catch (Exception e) {
- _logger.debug(e);
- return null;
- }
- return _prepDelete;
- } // ________________________________
-
protected enum State {
Pending, WorkInProgress, Done, Error;
- String getColumnValue() {
+ String toColumnValue() {
return toString().substring(0, 1);
}
+
+ static State toState(String columnValue) {
+ if(columnValue.equals("P")) {
+ return State.Pending;
+ } else if(columnValue.equals("W")) {
+ return State.WorkInProgress;
+ } else if(columnValue.equals("D")) {
+ return State.Done;
+ } else if(columnValue.equals("E")) {
+ return State.Error;
+ }
+
+ throw new IllegalArgumentException("Invalid State column value '" + columnValue + "'.");
+ }
}
public void setPollLatency(Long millis) {
@@ -489,6 +603,23 @@
_pollLatency = millis;
} // ________________________________
+ public static class CourierMessageRecord {
+
+ private String messageId;
+ private Serializable message;
+ private State state;
+ private long timestamp;
+ private long retryTime;
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public String toString() {
+ return messageId + ":" + timestamp;
+ }
+ }
+
protected long _pollLatency = 200;
protected long _sleepForRetries = 3000; // milliseconds
Added: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TransactedPickUpOnlyCourier.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TransactedPickUpOnlyCourier.java (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TransactedPickUpOnlyCourier.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers;
+
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+
+/**
+ * Transacted message pickup courier.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public interface TransactedPickUpOnlyCourier extends PickUpOnlyCourier {
+
+ public Message pickup(long millis, org.jboss.internal.soa.esb.message.tx.TransactedMessageDeliveryContext context) throws CourierException, CourierTimeoutException;
+}
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TransactedPickUpOnlyCourier.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -79,7 +79,7 @@
DeliverOnlyCourier old = _deliverCourier;
try
{
- _deliverCourier = getDeliverCourier(toEPR);
+ _deliverCourier = createDeliverCourier(toEPR);
_toEPR = toEPR ;
}
finally
@@ -97,7 +97,7 @@
PickUpOnlyCourier old = _pickupCourier;
try
{
- _pickupCourier = getPickupCourier(replyToEPR);
+ _pickupCourier = createPickupCourier(replyToEPR);
}
finally
{
@@ -105,14 +105,18 @@
}
}
- private DeliverOnlyCourier getDeliverCourier(EPR toEPR)
+ public PickUpOnlyCourier getPickUpOnlyCourier() {
+ return _pickupCourier;
+ }
+
+ private DeliverOnlyCourier createDeliverCourier(EPR toEPR)
throws CourierException, MalformedEPRException
{
return (null == toEPR) ? null : (DeliverOnlyCourier) courierFromEpr(
toEPR, false);
}
- private PickUpOnlyCourier getPickupCourier(EPR replyToEPR)
+ private PickUpOnlyCourier createPickupCourier(EPR replyToEPR)
throws CourierException, MalformedEPRException
{
return (null == replyToEPR) ? null
@@ -212,7 +216,7 @@
public Message pickup(long waitTime, EPR epr) throws CourierException,
CourierTimeoutException, MalformedEPRException
{
- return pickup(waitTime, getPickupCourier(epr));
+ return pickup(waitTime, createPickupCourier(epr));
}
private Message pickup(long waitTime, PickUpOnlyCourier courier)
Added: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/RollbackStrategy.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/RollbackStrategy.java (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/RollbackStrategy.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.message.tx;
+
+import org.jboss.soa.esb.common.TransactionStrategyException;
+
+/**
+ * Transaction Rollback Strategy.
+ * <p/>
+ *
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public interface RollbackStrategy {
+
+ /**
+ * Execute the transaction "rollback" strategy.
+ *
+ * @throws TransactionStrategyException Exception while executing the strategy.
+ */
+ public abstract void executeRollbackStrategy() throws TransactionStrategyException;
+
+ /**
+ * Should the underlying transaction be rolledback.
+ * <p/>
+ * Basically, should rollback really be called on the transaction.
+ *
+ * @return True if the transaction is to be rolled back, otherwise false.
+ */
+ public abstract boolean executeRollback();
+}
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/RollbackStrategy.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TerminateStrategy.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TerminateStrategy.java (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TerminateStrategy.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.message.tx;
+
+import org.jboss.soa.esb.common.TransactionStrategyException;
+
+/**
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public interface TerminateStrategy {
+
+ /**
+ * Execute the transaction "terminate" strategy.
+ *
+ * @throws org.jboss.soa.esb.common.TransactionStrategyException Exception while executing the strategy.
+ */
+ public abstract void executeTerminateStrategy() throws TransactionStrategyException;
+}
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TerminateStrategy.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TransactedMessageDeliveryContext.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TransactedMessageDeliveryContext.java (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TransactedMessageDeliveryContext.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.message.tx;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+
+import javax.transaction.Transaction;
+import java.util.UUID;
+
+/**
+ * Context object associating a message delivered by courier, with
+ * the transaction associated with that message delivery.
+ * <p/>
+ * Used to help make message delivery and processing transactional.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class TransactedMessageDeliveryContext {
+
+ private static Logger logger = Logger.getLogger(TransactedMessageDeliveryContext.class);
+
+ private TransactionStrategy strategy;
+ private boolean transactionStarted = false;
+ private Transaction transaction = null;
+ private String transactionLogId;
+ private RollbackStrategy rollbackStrategy;
+ private TerminateStrategy terminateStrategy;
+
+ public TransactedMessageDeliveryContext(TransactionStrategy strategy) {
+ this.strategy = strategy;
+ if(logger.isDebugEnabled()) {
+ transactionLogId = UUID.randomUUID().toString();
+ }
+ }
+
+ public boolean isTransactionStarted() {
+ return transactionStarted;
+ }
+
+ public void beginTransaction() throws TransactionStrategyException {
+ if(!transactionStarted) {
+ strategy.begin();
+ transactionStarted = true;
+ logDebug("Message delivery transaction started.");
+ }
+ }
+
+ public void suspendTransaction() throws TransactionStrategyException {
+ if(transactionStarted) {
+ transaction = strategy.suspend();
+ logDebug("Message delivery transaction suspended.");
+ }
+ }
+
+ public void resumeTransaction() throws TransactionStrategyException {
+ if(transactionStarted && transaction != null) {
+ strategy.resume(transaction);
+ transaction = null;
+ logDebug("Message delivery transaction resumed.");
+ }
+ }
+
+ public void rollbackTransaction() throws TransactionStrategyException {
+ if(transactionStarted) {
+ if(rollbackStrategy != null) {
+ try {
+ rollbackStrategy.executeRollbackStrategy();
+ } finally {
+ if(rollbackStrategy.executeRollback()) {
+ strategy.rollbackOnly();
+ logDebug("Message delivery transaction rolled back.");
+ } else {
+ logDebug("Not executing rollback on TransactionStrategy.");
+ }
+ }
+ } else {
+ strategy.rollbackOnly();
+ logDebug("Message delivery transaction rolled back.");
+ }
+ }
+ }
+
+ public void terminateTransaction() throws TransactionStrategyException {
+ if(transactionStarted) {
+ try {
+ if(terminateStrategy != null) {
+ terminateStrategy.executeTerminateStrategy();
+ }
+ } finally {
+ transactionStarted = false;
+ strategy.terminate();
+ }
+ logDebug("Message delivery transaction terminated.");
+ }
+ }
+
+ public void setRollbackStrategy(RollbackStrategy rollbackStrategy) {
+ this.rollbackStrategy = rollbackStrategy;
+ }
+
+ public void setTerminateStrategy(TerminateStrategy terminateStrategy) {
+ this.terminateStrategy = terminateStrategy;
+ }
+
+ private void logDebug(String message) {
+ if(logger.isDebugEnabled()) {
+ Exception e = new Exception();
+ logger.debug("[" + transactionLogId + "] " + message + " Thread '" + Thread.currentThread().getName() + "'. Caller: '" + e.getStackTrace()[2] + "'");
+ }
+ }
+}
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/internal/soa/esb/message/tx/TransactedMessageDeliveryContext.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -24,6 +24,7 @@
import javax.transaction.Status;
import javax.transaction.TransactionManager;
+import javax.transaction.Transaction;
import org.apache.log4j.Logger;
import org.jboss.system.ServiceMBeanSupport;
@@ -123,7 +124,33 @@
throw new TransactionStrategyException("Failed to begin transaction on current thread", th) ;
}
}
-
+
+ public Transaction suspend() throws TransactionStrategyException {
+ try
+ {
+ LOGGER.debug("Suspending transaction on current thread") ;
+ return tm.suspend();
+ }
+ catch (final Throwable th)
+ {
+ LOGGER.debug("Failed to suspend transaction on current thread", th) ;
+ throw new TransactionStrategyException("Failed to suspend transaction on current thread", th) ;
+ }
+ }
+
+ public void resume(Transaction transaction) throws TransactionStrategyException {
+ try
+ {
+ LOGGER.debug("Resuming transaction on current thread") ;
+ tm.resume(transaction);
+ }
+ catch (final Throwable th)
+ {
+ LOGGER.debug("Failed to resume transaction on current thread", th) ;
+ throw new TransactionStrategyException("Failed to resume transaction on current thread", th) ;
+ }
+ }
+
/**
* Terminate the transaction on the current thread.
* If the transaction has been marked for rollback then it
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -22,6 +22,8 @@
package org.jboss.soa.esb.common;
+import javax.transaction.Transaction;
+
/**
* This class represents the transaction strategy that is currently in force
* within the ESB. At present there are two strategies employed, a null strategy
@@ -72,9 +74,23 @@
* @throws TransactionStrategyException
*/
public abstract void begin()
- throws TransactionStrategyException ;
-
+ throws TransactionStrategyException ;
+
/**
+ * Suspend the transaction on the current thread.
+ * @throws TransactionStrategyException
+ */
+ public abstract Transaction suspend()
+ throws TransactionStrategyException ;
+
+ /**
+ * Resume the transaction on the current thread.
+ * @throws TransactionStrategyException
+ */
+ public abstract void resume(Transaction transaction)
+ throws TransactionStrategyException ;
+
+ /**
* Terminate the transaction on the current thread.
* If the transaction has been marked for rollback then it
* will be rolled back, otherwise it will be committed.
@@ -104,7 +120,14 @@
throws TransactionStrategyException
{
}
-
+
+ public Transaction suspend() throws TransactionStrategyException {
+ return null;
+ }
+
+ public void resume(Transaction transaction) throws TransactionStrategyException {
+ }
+
/**
* Terminate the transaction on the current thread.
* If the transaction has been marked for rollback then it
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -48,8 +48,16 @@
m_oDS = p_oDS;
m_oLogger = Logger.getLogger(this.getClass());
}
-
- public void commit() throws SQLException
+
+ public Connection getConnection() throws SQLException {
+ if (m_conn == null) {
+ connect();
+ }
+
+ return m_conn;
+ }
+
+ public void commit() throws SQLException
{
if (null != m_conn)
{
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -259,22 +259,32 @@
public static JDBCEpr jdbcEprFromElement(ConfigTree tree) throws ConfigurationException
{
- String url = tree.getRequiredAttribute(JDBCEpr.URL_TAG);
- if (!url.toLowerCase().startsWith("jdbc"))
- throw new ConfigurationException("URL in "
- + ListenerTagNames.URL_TAG + " must be a jdbc URL");
+ String protocol = tree.getRequiredAttribute(ListenerTagNames.PROTOCOL_TAG);
+ String url = tree.getAttribute(JDBCEpr.URL_TAG, "jdbc://url-unset");
- try
+ if (!protocol.equalsIgnoreCase(JDBCEpr.JDBC_PROTOCOL)) {
+ throw new ConfigurationException("EPR Config protocol no 'jdbc'.");
+ }
+
+ try
{
- boolean bPostDel = Boolean.valueOf(tree.getAttribute(
- JDBCEpr.POST_DEL_TAG, "true"));
- boolean bErrorDel = Boolean.valueOf(tree.getAttribute(
- JDBCEpr.ERROR_DEL_TAG, "true"));
+ String datasource = tree.getAttribute(JDBCEpr.DATASOURCE_TAG);
+ String driver = tree.getAttribute(JDBCEpr.DRIVER_TAG);
+ boolean bPostDel = tree.getBooleanAttribute(JDBCEpr.POST_DEL_TAG, true);
+ boolean bErrorDel = tree.getBooleanAttribute(JDBCEpr.ERROR_DEL_TAG, true);
+
JDBCEpr epr = new JDBCEpr(url, bPostDel, bErrorDel);
- epr.setDriver(tree.getRequiredAttribute(JDBCEpr.DRIVER_TAG));
- epr.setUserName(getAttrAndWarn(tree, JDBCEpr.USERNAME_TAG, ""));
- epr.setPassword(getAttrAndWarn(tree, JDBCEpr.PASSWORD_TAG, ""));
- epr.setTableName(tree.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG));
+
+ if(datasource != null) {
+ epr.setDatasource(datasource);
+ } else if(driver != null) {
+ epr.setDriver(driver);
+ epr.setUserName(getAttrAndWarn(tree, JDBCEpr.USERNAME_TAG, ""));
+ epr.setPassword(getAttrAndWarn(tree, JDBCEpr.PASSWORD_TAG, ""));
+ } else {
+ throw new ConfigurationException("JDBC EPR Config must define a 'datasource' or 'driver' config.");
+ }
+ epr.setTableName(tree.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG));
epr.setMessageIdColumn(getColName(tree, JDBCEpr.MESSAGE_ID_COLUMN_TAG));
epr.setStatusColumn(getColName(tree, JDBCEpr.STATUS_COLUMN_TAG));
epr.setDataColumn(getColName(tree, JDBCEpr.DATA_COLUMN_TAG));
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/SqlListenerMapper.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -126,6 +126,7 @@
}
private static void mapSqlEprProperties(Element toElement, SqlProvider provider, SqlMessageFilter messageFilter) {
+ toElement.setAttribute(ListenerTagNames.PROTOCOL_TAG, JDBCEpr.JDBC_PROTOCOL);
toElement.setAttribute(JDBCEpr.DATASOURCE_TAG, provider.getDatasource());
toElement.setAttribute(JDBCEpr.URL_TAG, provider.getUrl());
toElement.setAttribute(JDBCEpr.DRIVER_TAG, provider.getDriver());
@@ -140,7 +141,5 @@
toElement.setAttribute(JDBCEpr.TIMESTAMP_COLUMN_TAG, messageFilter.getInsertTimestampColumn());
toElement.setAttribute(JDBCEpr.POST_DEL_TAG, String.valueOf(messageFilter.getPostDelete()));
toElement.setAttribute(JDBCEpr.ERROR_DEL_TAG, String.valueOf(messageFilter.getErrorDelete()));
-
-
}
}
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessorMethodInfo.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessorMethodInfo.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessorMethodInfo.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -123,6 +123,8 @@
if (th instanceof ActionProcessingException)
{
throw (ActionProcessingException)th ;
+ } else if (th instanceof RuntimeException) {
+ throw (RuntimeException) th;
}
else if (th instanceof RuntimeException)
{
Modified: labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-12-03 10:51:36 UTC (rev 16981)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -22,22 +22,17 @@
package org.jboss.soa.esb.listeners.message;
-import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
+import org.jboss.internal.soa.esb.message.tx.TransactedMessageDeliveryContext;
+import org.jboss.internal.soa.esb.couriers.TransactedPickUpOnlyCourier;
+import org.jboss.internal.soa.esb.couriers.TwoWayCourierImpl;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.couriers.FaultMessageException;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.couriers.*;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -49,6 +44,11 @@
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
/**
* Esb Message aware transport independent listener. <p/> Relies on the
* CourierFactory to obtain an appropriate Courier for the EPR this listener
@@ -206,8 +206,20 @@
throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
}
- _pickUpCourier = pickUpCourier ;
+ final boolean transacted = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
+ transactionStrategy = TransactionStrategy.getTransactionStrategy(transacted) ;
+ pipeline.setTransactional(transacted);
+ // TODO: The courier code is completely mixed up.
+ // Amoung other things, the TwoWayCourier appears to be an effort to associate a delivery and
+ // pickup courier through a common interface. A common interface is not the way to do this and leads to
+ // the following type of hack in order to get at the actual/real courier....
+ if(pickUpCourier instanceof TwoWayCourierImpl) {
+ _pickUpCourier = ((TwoWayCourierImpl)pickUpCourier).getPickUpOnlyCourier();
+ } else {
+ _pickUpCourier = pickUpCourier ;
+ }
+
try
{
RegistryUtil.register(_config, _epr);
@@ -263,56 +275,125 @@
public void waitForEventAndProcess (long maxWaitMillis)
{
- Message message = null ;
- try
- {
- message = (maxWaitMillis > 0) ? _pickUpCourier
- .pickup(maxWaitMillis) : null;
- errorDelay = 0 ;
- }
- catch (CourierTimeoutException e)
- {
- return;
- }
- catch (FaultMessageException fme)
- {
- message = fme.getReturnedMessage() ;
- }
- catch (CourierException e)
- {
- _logger.debug("Courier Exception", e);
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY ;
+ TransactedMessageDeliveryContext context = new TransactedMessageDeliveryContext(transactionStrategy);
+ Message message = null;
+
+ try {
+ message = pickupMessage(maxWaitMillis, context);
+ } catch (Throwable t) {
+ try {
+ context.rollbackTransaction();
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to rollback transaction.", e);
+ }
+ return;
+ } finally {
+ if (message != null) {
+ // If a transaction is started, it will be suspended, restarted and terminated on
+ // the process thread...
+ processMessage(message, context);
+ } else {
+ // Make sure any started transactions are terminated...
+ try {
+ context.terminateTransaction();
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to terminate transaction.", e);
+ }
+ }
+ }
+ } // ________________________________
+
+ private Message pickupMessage(long maxWaitMillis, TransactedMessageDeliveryContext xaContext) {
+ try {
+ Message message = null;
+
+ if(maxWaitMillis > 0) {
+ if(_pickUpCourier instanceof TransactedPickUpOnlyCourier) {
+ message = ((TransactedPickUpOnlyCourier)_pickUpCourier).pickup(maxWaitMillis, xaContext);
+ } else {
+ message = _pickUpCourier.pickup(maxWaitMillis);
+ }
+ }
+ errorDelay = 0;
+
+ return message;
+ } catch (CourierTimeoutException e) {
+ rollbackAndTerminate(xaContext);
+ return null;
+ }catch (FaultMessageException fme) {
+ return fme.getReturnedMessage();
+ } catch (CourierException e) {
+ rollbackAndTerminate(xaContext);
+ _logger.debug("Courier Exception", e);
+ if (errorDelay == 0) {
+ errorDelay = MIN_ERROR_DELAY;
+ } else if (errorDelay < MAX_ERROR_DELAY) {
+ errorDelay <<= 1;
+ }
+ _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds. Error: " + e.getMessage());
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay);
+ return null;
+ }
+ }
+
+ private void processMessage(Message message, final org.jboss.internal.soa.esb.message.tx.TransactedMessageDeliveryContext context) {
+ final Message pipelineMessage = message ;
+ final Runnable pipelineRunner = new Runnable() {
+ public void run() {
+ try {
+ context.resumeTransaction();
+
+ try {
+ pipeline.process(pipelineMessage) ;
+ } catch (Throwable t) {
+ try {
+ context.rollbackTransaction();
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to rollback transaction.", e);
}
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1 ;
- }
- _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
- return;
- }
-
- if (null != message)
- {
- final Message pipelineMessage = message ;
- final Runnable pipelineRunner = new Runnable() {
- public void run() {
- try {
- pipeline.process(pipelineMessage) ;
- } finally {
- updateThreadCount(-1) ;
+ } finally {
+ try {
+ if(context != null) {
+ try {
+ context.terminateTransaction();
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to terminate transaction.", e);
+ }
}
+ } finally {
+ updateThreadCount(-1) ;
}
- } ;
- updateThreadCount(+1);
- _execService.execute(pipelineRunner);
- }
+ }
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to resume transaction.", e);
+ }
+ }
+ } ;
- } // ________________________________
+ try {
+ updateThreadCount(+1);
+ context.suspendTransaction();
+ _execService.execute(pipelineRunner);
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to suspend transaction.", e);
+ }
+ }
- /**
+ private void rollbackAndTerminate(org.jboss.internal.soa.esb.message.tx.TransactedMessageDeliveryContext context) {
+ try {
+ context.rollbackTransaction();
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to rollback transaction.", e);
+ } finally {
+ try {
+ context.terminateTransaction();
+ } catch (TransactionStrategyException e) {
+ _logger.error("Failed to terminate transaction.", e);
+ }
+ }
+ }
+
+ /**
* Handle the threaded destroy of the managed instance.
*
* @throws ManagedLifecycleException for errors while destroying.
@@ -385,7 +466,8 @@
{
synchronized (_synchThreads)
{
- _qRunningThreads += i.intValue();
+ _qRunningThreads += i;
+ System.out.println("Running Threads: " + _qRunningThreads + " - " + _pickUpCourier.getClass().getName());
if (_qRunningThreads < _maxThreads)
{
_synchThreads.notifyAll() ;
@@ -407,7 +489,7 @@
private long _latencySecs;
- private long _pauseLapseInMillis = 50 ;
+ private long _pauseLapseInMillis = 200;
private ExecutorService _execService;
@@ -418,4 +500,9 @@
private Logger _logger = Logger.getLogger(MessageAwareListener.class);
private PickUpOnlyCourier _pickUpCourier;
+
+ /**
+ * The current transaction strategy
+ */
+ private TransactionStrategy transactionStrategy ;
}
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/build.xml
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/build.xml (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/build.xml 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,75 @@
+<project name="Quickstart_helloworld_SQL2_action" default="run" basedir=".">
+
+ <description>
+ ${ant.project.name}
+ ${line.separator}
+ </description>
+
+ <property name="additional.deploys" value="jbossesb-service.xml" />
+
+ <!-- Import the base Ant build script... -->
+ <import file="../conf/base-build.xml"/>
+
+ <target name="quickstart-specific-assemblies" description="Quickstart specific assemblies">
+ </target>
+
+ <target name="quickstart-specific-deploys" description="Quickstart specific deploys">
+ <copy file="quickstart-ds.xml"
+ todir="${org.jboss.esb.server.deploy.dir}"
+ overwrite="false"/>
+ </target>
+
+ <target name="runtest" depends="compile" description="Send a message to the JMS Topic">
+ <java fork="yes" classname="org.jboss.soa.esb.testutils.JMSUtil" failonerror="true" classpathref="exec-classpath">
+ <arg value="queue/A"/>
+ <arg value="queue"/>
+ <arg value="Hello"/>
+ </java>
+ </target>
+
+ <!-- target name="runtest" depends="dependencies" description="Insert row data into sql table polled by gateway">
+ <property name="hsqldb.jar"
+ value="${org.jboss.esb.server.home}/server/${org.jboss.esb.server.config}/lib/hsqldb.jar"/>
+ <echo>Insert row data into sql table polled by gateway</echo>
+ <sql
+ driver="org.hsqldb.jdbcDriver"
+ url="jdbc:hsqldb:hsql://localhost:1703"
+ userid="sa"
+ autocommit="true"
+ password="">
+ <classpath>
+ <pathelement path="${hsqldb.jar}"/>
+ </classpath>
+ <transaction src="populate.sql"/>
+ </sql>
+ </target -->
+
+ <target name="select" depends="dependencies" description="select * from gateway_table">
+ <property name="hsqldb.jar"
+ value="${org.jboss.esb.server.home}/server/${org.jboss.esb.server.config}/lib/hsqldb.jar"/>
+ <echo>Select * from gateway_table</echo>
+ <sql
+ print="true"
+ driver="org.hsqldb.jdbcDriver"
+ url="jdbc:hsqldb:hsql://localhost:1703"
+ userid="sa"
+ autocommit="true"
+ password="">
+ <classpath>
+ <pathelement path="${hsqldb.jar}"/>
+ </classpath>
+ <transaction>
+ select * from gateway_table
+ </transaction>
+ </sql>
+ </target>
+
+ <target name="deploy-jms-dests">
+ <echo message="This quickstart doesn't use any JMS Destinations. No JMS deployments required." />
+ </target>
+
+ <target name="undeploy-jms-dests">
+ <echo message="This quickstart doesn't use any JMS Destinations. No JMS undeployments required." />
+ </target>
+
+</project>
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/build.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jboss-esb.xml
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jboss-esb.xml (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jboss-esb.xml 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,61 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd" parameterReloadSecs="5">
+
+ <providers>
+ <sql-provider name="SQLprovider"
+ datasource="java:/QuickstartDB">
+ <sql-bus busid="helloSQLChannel" >
+ <sql-message-filter
+ tablename="GATEWAY_TABLE"
+ message-column="data_column"
+ status-column="STATUS_COL"
+ order-by="DATA_COLUMN"
+ where-condition="DATA_COLUMN like 'data%'"
+ message-id-column="UNIQUE_ID"
+ />
+ </sql-bus>
+ </sql-provider>
+
+ <jms-provider name="JBossMQ"
+ connection-factory="ConnectionFactory"
+ jndi-context-factory="org.jnp.interfaces.NamingContextFactory"
+ jndi-URL="localhost" >
+
+ <jms-bus busid="quickstartEsbChannel">
+ <jms-message-filter
+ dest-type="QUEUE"
+ dest-name="queue/A"
+ />
+ </jms-bus>
+
+ </jms-provider>
+
+ </providers>
+
+ <services>
+ <service
+ category="myCategory"
+ name="myJmsListener"
+ description="Hello World SQL Action (esb jdbc listener)">
+ <listeners>
+ <jms-listener name="helloWorldJmsAction"
+ busidref="quickstartEsbChannel"
+ maxThreads="1"
+ is-gateway="true"/>
+ <sql-listener name="SqlGateway" busidref="helloSQLChannel" maxThreads="1">
+ <property name="transacted" value="true" />
+ </sql-listener>
+ </listeners>
+ <actions mep="OneWay">
+ <action name="action1"
+ class="org.jboss.soa.esb.samples.quickstart.helloworldsqlaction.MyAction"
+ process="displayMessage"
+ />
+ <action name="action2" class="org.jboss.soa.esb.actions.SystemPrintln" />
+ </actions>
+
+ </service>
+ </services>
+
+</jbossesb>
+
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jboss-esb.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-properties.xml
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-properties.xml (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-properties.xml 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ JBoss, Home of Professional Open Source
+ Copyright 2006, JBoss Inc., and others contributors as indicated
+ by the @authors tag. All rights reserved.
+ See the copyright.txt in the distribution for a
+ full listing of individual contributors.
+ This copyrighted material is made available to anyone wishing to use,
+ modify, copy, or redistribute it subject to the terms and conditions
+ of the GNU Lesser General Public License, v. 2.1.
+ This program is distributed in the hope that it will be useful, but WITHOUT A
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ You should have received a copy of the GNU Lesser General Public License,
+ v.2.1 along with this distribution; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ MA 02110-1301, USA.
+
+ (C) 2005-2006,
+ @author JBoss Inc.
+-->
+<!-- $Id: jbossesb-unittest-properties.xml $ -->
+<!--
+ These options are described in the JBossESB manual.
+ Defaults are provided here for convenience only.
+
+ Please read through this file prior to using the system, and consider
+ updating the specified entries.
+-->
+<esb
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="jbossesb-1_0.xsd">
+ <properties name="core">
+ <property name="org.jboss.soa.esb.jndi.server.type" value="jboss"/>
+ <property name="org.jboss.soa.esb.jndi.server.url" value="localhost"/>
+ <property name="org.jboss.soa.esb.persistence.connection.factory" value="org.jboss.internal.soa.esb.persistence.format.MessageStoreFactoryImpl"/>
+ </properties>
+ <properties name="registry">
+ <property name="org.jboss.soa.esb.registry.queryManagerURI"
+ value="jnp://localhost:1099/InquiryService?org.apache.juddi.registry.rmi.Inquiry#inquire"/>
+ <property name="org.jboss.soa.esb.registry.lifeCycleManagerURI"
+ value="jnp://localhost:1099/PublishService?org.apache.juddi.registry.rmi.Publish#publish" />
+ <property name="org.jboss.soa.esb.registry.implementationClass"
+ value="org.jboss.internal.soa.esb.services.registry.JAXRRegistryImpl"/>
+ <property name="org.jboss.soa.esb.registry.factoryClass"
+ value="org.apache.ws.scout.registry.ConnectionFactoryImpl"/>
+ <property name="org.jboss.soa.esb.registry.user"
+ value="jbossesb"/>
+ <property name="org.jboss.soa.esb.registry.password"
+ value="password"/>
+ <!-- the following parameter is scout specific to set the type of communication between scout and the UDDI (embedded, rmi, soap) -->
+ <property name="org.jboss.soa.esb.scout.proxy.transportClass"
+ value="org.apache.ws.scout.transport.RMITransport"/>
+ </properties>
+ <properties name="transports" depends="core">
+ <property name="org.jboss.soa.esb.mail.smtp.host" value="localhost"/>
+ <property name="org.jboss.soa.esb.mail.smtp.user" value="jbossesb"/>
+ <property name="org.jboss.soa.esb.mail.smtp.password" value=""/>
+ <property name="org.jboss.soa.esb.mail.smtp.port" value="25"/>
+ </properties>
+ <properties name="connection">
+ <property name="min-pool-size" value="5"/>
+ <property name="max-pool=size" value="10"/>
+ <property name="blocking-timeout-millis" value="5000"/>
+ <property name="abandoned-connection-timeout" value="10000"/>
+ <property name="abandoned-connection-time-interval" value="30000"/>
+ </properties>
+ <properties name="dbstore">
+ <property name="org.jboss.soa.esb.persistence.db.connection.url" value="jdbc:hsqldb:hsql://localhost:9001/"/>
+ <property name="org.jboss.soa.esb.persistence.db.jdbc.driver" value="org.hsqldb.jdbcDriver"/>
+ <property name="org.jboss.soa.esb.persistence.db.user" value="sa"/>
+ <property name="org.jboss.soa.esb.persistence.db.pwd" value=""/>
+ <property name="org.jboss.soa.esb.persistence.db.pool.initial.size" value="2"/>
+ <property name="org.jboss.soa.esb.persistence.db.pool.min.size" value="2"/>
+ <property name="org.jboss.soa.esb.persistence.db.pool.max.size" value="5"/>
+ <!--table managed by pool to test for valid connections - created by pool automatically -->
+ <property name="org.jboss.soa.esb.persistence.db.pool.test.table" value="pooltest"/>
+ <!-- # of milliseconds to timeout waiting for a connection from pool -->
+ <property name="org.jboss.soa.esb.persistence.db.pool.timeout.millis" value="5000"/>
+ <property name="org.jboss.soa.esb.persistence.db.conn.manager" value="org.jboss.internal.soa.esb.persistence.manager.StandaloneConnectionManager"/>
+ </properties>
+ <properties name="messagerouting">
+ <property name="org.jboss.soa.esb.routing.cbrClass" value="org.jboss.internal.soa.esb.services.routing.cbr.JBossRulesRouter"/>
+ </properties>
+</esb>
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-properties.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-service.xml
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-service.xml (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-service.xml 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<server>
+ <mbean code="org.jboss.internal.soa.esb.dependencies.DatabaseInitializer"
+ name="jboss.esb:service=QuickstartDatabaseInitializer">
+ <attribute name="Datasource">java:/QuickstartDB</attribute>
+ <attribute name="ExistsSql">select * from gateway_table</attribute>
+ <attribute name="SqlFiles">
+ hsqldb/create.sql
+ </attribute>
+ <depends>jboss.jca:name=QuickstartDB,service=DataSourceBinding</depends>
+ </mbean>
+</server>
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jbossesb-service.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jndi.properties
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jndi.properties (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jndi.properties 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,5 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming
+java.naming.factory.url.pkgs=org.jnp.interfaces
+
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/jndi.properties
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/juddi.properties
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/juddi.properties (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/juddi.properties 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,69 @@
+# jUDDI Registry Properties (used by RegistryServer)
+# see http://www.juddi.org for more information
+
+# The UDDI Operator Name
+juddi.operatorName = jUDDI.org
+
+# The i18n locale default codes
+juddi.i18n.languageCode = en
+juddi.i18n.countryCode = US
+
+# The UDDI DiscoveryURL Prefix
+juddi.discoveryURL = http://localhost:8080/juddi/uddiget.jsp?
+
+# The UDDI Operator Contact Email Address
+juddi.operatorEmailAddress = admin at juddi.org
+
+# The maximum name size and maximum number
+# of name elements allows in several of the
+# FindXxxx and SaveXxxx UDDI functions.
+juddi.maxNameLength=255
+juddi.maxNameElementsAllowed=5
+
+# The maximum number of UDDI artifacts allowed
+# per publisher. A value of '-1' indicates any
+# number of artifacts is valid (These values can be
+# overridden at the individual publisher level).
+juddi.maxBusinessesPerPublisher=25
+juddi.maxServicesPerBusiness=20
+juddi.maxBindingsPerService=10
+juddi.maxTModelsPerPublisher=100
+
+# jUDDI Authentication module to use
+juddi.auth = org.apache.juddi.auth.DefaultAuthenticator
+
+# jUDDI DataStore module currently to use
+juddi.dataStore = org.apache.juddi.datastore.jdbc.JDBCDataStore
+
+# use a dataSource (if set to false a direct
+# jdbc connection will be used.
+juddi.isUseDataSource=false
+juddi.jdbcDriver=com.mysql.jdbc.Driver
+juddi.jdbcUrl=jdbc:mysql://localhost:3306/juddi
+juddi.jdbcUsername=root
+juddi.jdbcPassword=admin
+# jUDDI DataSource to use
+# juddi.dataSource=java:comp/env/jdbc/MySqlDS
+
+# jUDDI UUIDGen implementation to use
+juddi.uuidgen = org.apache.juddi.uuidgen.DefaultUUIDGen
+
+# jUDDI Cryptor implementation to use
+juddi.cryptor = org.apache.juddi.cryptor.DefaultCryptor
+
+# jUDDI Validator to use
+juddi.validator=org.apache.juddi.validator.DefaultValidator
+
+# jUDDI Proxy Properties (used by RegistryProxy)
+juddi.proxy.adminURL = http://localhost:8080/juddi/admin
+juddi.proxy.inquiryURL = http://localhost:8080/juddi/inquiry
+juddi.proxy.publishURL = http://localhost:8080/juddi/publish
+juddi.proxy.transportClass = org.apache.juddi.proxy.AxisTransport
+juddi.proxy.securityProvider = com.sun.net.ssl.internal.ssl.Provider
+juddi.proxy.protocolHandler = com.sun.net.ssl.internal.www.protocol
+
+# JNDI settings (used by RMITransport)
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming
+
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/juddi.properties
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/listener.log
===================================================================
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/log4j.xml
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/log4j.xml (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/log4j.xml 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- ===================================================================== -->
+<!-- -->
+<!-- Log4j Configuration -->
+<!-- -->
+<!-- ===================================================================== -->
+
+<!-- $Id: log4j.xml,v 1.26.2.5 2005/09/15 09:31:02 dimitris Exp $ -->
+
+<!--
+ | For more configuration infromation and examples see the Jakarta Log4j
+ | owebsite: http://jakarta.apache.org/log4j
+ -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <!-- ============================== -->
+ <!-- Append messages to the console -->
+ <!-- ============================== -->
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
+ <param name="Target" value="System.out"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <param name="ConversionPattern" value="%d{ABSOLUTE} %-5p [%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <!-- ================================= -->
+ <!-- Preserve messages in a local file -->
+ <!-- ================================= -->
+
+ <!-- A size based file rolling appender -->
+ <appender name="FILE" class="org.jboss.logging.appender.RollingFileAppender">
+ <errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
+ <param name="File" value="./listener.log"/>
+ <param name="Append" value="false"/>
+ <param name="MaxFileSize" value="500KB"/>
+ <param name="MaxBackupIndex" value="1"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t][%c] %m%n"/>
+ </layout>
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+
+ <category name="org.jboss">
+ <priority value="WARN"/>
+ </category>
+ <category name="org.jboss.soa.esb">
+ <priority value="ERROR"/>
+ </category>
+ <category name="org.jboss.internal.soa.esb">
+ <priority value="ERROR"/>
+ </category>
+ <category name="org.apache">
+ <priority value="ERROR"/>
+ </category>
+ <category name="quickstart">
+ <priority value="INFO"/>
+ </category>
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="FILE"/>
+ </root>
+
+</log4j:configuration>
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/log4j.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/populate.sql
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/populate.sql (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/populate.sql 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,6 @@
+insert into gateway_table(data_column, status_col) values('data 111111','P');
+insert into gateway_table(data_column, status_col) values('data 22','P');
+insert into gateway_table(data_column, status_col) values('data 333333333333111111','P');
+insert into gateway_table(data_column, status_col) values('data d d d d','P');
+insert into gateway_table(data_column, status_col) values('do not consume', 'P');
+insert into gateway_table(data_column, status_col) values('data last record','P');
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/quickstart-ds.xml
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/quickstart-ds.xml (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/quickstart-ds.xml 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<datasources>
+ <local-tx-datasource>
+ <jndi-name>QuickstartDB</jndi-name>
+ <connection-url>jdbc:hsqldb:hsql://${jboss.bind.address}:1703</connection-url>
+ <driver-class>org.hsqldb.jdbcDriver</driver-class>
+ <user-name>sa</user-name>
+ <password></password>
+ <min-pool-size>5</min-pool-size>
+ <max-pool-size>20</max-pool-size>
+ <idle-timeout-minutes>0</idle-timeout-minutes>
+ <depends>jboss:service=Hypersonic</depends>
+ <prepared-statement-cache-size>32</prepared-statement-cache-size>
+ </local-tx-datasource>
+
+ <!-- For hsqldb accessed from jboss only, in-process (standalone) mode -->
+ <mbean code="org.jboss.jdbc.HypersonicDatabase"
+ name="jboss:service=Hypersonic">
+ <attribute name="Port">1703</attribute>
+ <attribute name="BindAddress">${jboss.bind.address}</attribute>
+ <attribute name="Database">quickstartDB</attribute>
+ <attribute name="Silent">true</attribute>
+ <attribute name="Trace">false</attribute>
+ <attribute name="No_system_exit">true</attribute>
+ </mbean>
+</datasources>
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/quickstart-ds.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/readme.txt
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/readme.txt (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/readme.txt 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,32 @@
+Overview:
+=========
+ This quickstart demonstrates an SQL listener.
+
+Running this quickstart:
+========================
+ Please refer to 'ant help-quickstarts' for prerequisites about the quickstarts
+ and a more detailed descripton of the different ways to run the quickstarts.
+
+ NOTE:
+ 1. This quickstart uses hsqldb so that it can be deployed without any
+ setup required.
+ 2. "ant select" will show you the contents of the database table. The
+ <sql-message-filter> defined has a where-condition, so one of the rows
+ that the table is populated with will never be processed. There
+ should be one remaining row for each time the table is populated.
+
+To Run standalone mode:
+=======================
+ 1. In a command terminal window in this folder ("Window1"), type 'ant run'.
+ 2. Open another command terminal window in this folder ("Window2"), type
+ 'ant runtest'.
+ 3. Switch back to "Window1" to see the output from the ESB
+ 4. When finished, interrupt the ESB using Ctrl-C.
+
+To Run '.esb' archive mode:
+===========================
+ 1. In a command terminal window in this folder ("Window1"), type 'ant deploy'.
+ 2. Open another command terminal window in this folder ("Window2"), type
+ 'ant runtest'.
+ 3. Switch back to Application Server console to see the output from the ESB
+ 4. In this folder ("Window1"), type 'ant undeploy'.
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/hsqldb/create.sql
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/hsqldb/create.sql (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/hsqldb/create.sql 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,8 @@
+create table gateway_table
+(
+unique_id VARCHAR(255) NOT NULL,
+data_column VARCHAR(1024) NOT NULL,
+status_col VARCHAR(255) NOT NULL,
+insert_timestamp VARCHAR(30) NOT NULL,
+retrytime VARCHAR(30) NOT NULL
+);
Added: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/helloworldsqlaction/MyAction.java
===================================================================
--- labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/helloworldsqlaction/MyAction.java (rev 0)
+++ labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/helloworldsqlaction/MyAction.java 2007-12-03 11:08:09 UTC (rev 16982)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+package org.jboss.soa.esb.samples.quickstart.helloworldsqlaction;
+
+import org.jboss.soa.esb.actions.AbstractActionLifecycle;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+
+import java.util.Map;
+
+public class MyAction extends AbstractActionLifecycle {
+
+ private ConfigTree _config;
+ private int exceptions = 0;
+
+ public MyAction(ConfigTree config) {
+ _config = config;
+ }
+
+ public Message noOperation(Message message) {
+ return message;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Message displayMessage(Message message) throws Exception {
+ //logHeader();
+ //System.out.println(message.getBody().get());
+ //logFooter();
+ if(exceptions < 5) {
+ exceptions++;
+ throw new RuntimeException("Hey there - error!!");
+ }
+ //System.out.println("Action successful....");
+ /*
+ Map<String, Object> rowData = (Map) message.getBody().get();
+ StringBuffer results = new StringBuffer();
+ for (Map.Entry<String, Object> curr : rowData.entrySet()) {
+ results.append("column " + curr.getKey() + " = <" + curr.getValue() + ">");
+ }
+ System.out.println(results.toString());
+ logFooter();
+
+ // Set message properties and message body so that SystemPrintln will display message
+ message.getProperties().setProperty("jbesbfilename", "helloworldSQlAction.log");
+ message.getBody().add(results.toString());
+ */
+
+ return message;
+ }
+
+ // This makes it easier to read on the console
+ private void logHeader() {
+ System.out.println("\n&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
+ }
+
+ private void logFooter() {
+ System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&\n");
+ }
+
+
+}
Property changes on: labs/jbossesb/workspace/SQLCourier_XA/product/samples/quickstarts/helloworld_sql2_action/src/org/jboss/soa/esb/samples/quickstart/helloworldsqlaction/MyAction.java
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list