[jboss-svn-commits] JBL Code SVN: r8394 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb/listeners rosetta/src/org/jboss/internal/soa/esb/couriers rosetta/src/org/jboss/soa/esb/addressing/eprs
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Dec 18 20:53:58 EST 2006
Author: estebanschifman
Date: 2006-12-18 20:53:49 -0500 (Mon, 18 Dec 2006)
New Revision: 8394
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
Log:
deliver() and pickup() are now fully functional for SQLCourier
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2006-12-18 22:52:58 UTC (rev 8393)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2006-12-19 01:53:49 UTC (rev 8394)
@@ -230,7 +230,9 @@
throw new ConfigurationException("URL in "
+ListenerTagNames.URL_TAG+" must be a jdbc URL");
- JDBCEpr epr = new JDBCEpr(url,"");
+ boolean bPostDel = Boolean.valueOf(tree.getAttribute(JDBCEpr.POST_DEL_TAG,"true"));
+ boolean bErrorDel = Boolean.valueOf(tree.getAttribute(JDBCEpr.ERROR_DEL_TAG,"true"));
+ JDBCEpr epr = new JDBCEpr(url,bPostDel,bErrorDel);
epr.setDriver (tree.getAttribute(JDBCEpr.DRIVER_TAG ,null));
epr.setUserName (getAttrAndWarn(tree,JDBCEpr.USERNAME_TAG ,""));
epr.setPassword (getAttrAndWarn(tree,JDBCEpr.PASSWORD_TAG ,""));
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2006-12-18 22:52:58 UTC (rev 8393)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2006-12-19 01:53:49 UTC (rev 8394)
@@ -22,6 +22,7 @@
package org.jboss.internal.soa.esb.couriers;
+import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.PreparedStatement;
@@ -36,6 +37,7 @@
import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXParseException;
public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
{
@@ -59,8 +61,13 @@
_isReceiver = isReceiver;
_epr = epr;
_sleepForRetries = 3000;
+ try
+ {
+ _postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr.getPostDelete()));
+ _errorDelete= Boolean.TRUE.equals(Boolean.valueOf(epr.getErrorDelete()));
+ }
+ catch (URISyntaxException e) { throw new CourierException(e); }
-
} //________________________________
void cleanup()
@@ -138,16 +145,6 @@
return false;
} //________________________________
- private ResultSet getRowList() throws CourierException
- {
- if (null==_conn)
- try { _conn = getConn();}
- catch (Exception e) {throw new CourierException(e); }
- ResultSet RS = null;
- return RS;
-
- } //_______________________________
-
public Message pickup(long millis) throws CourierException
{
Message result = null;
@@ -158,7 +155,7 @@
ResultSet RS = getRowList();
try
{
- while (RS.next())
+ while (null!=RS && RS.next())
{
String messageId = RS.getString(1);
if (null==(result=tryToPickup(messageId)))
@@ -178,13 +175,86 @@
return null;
} //________________________________
- private Message tryToPickup(String messageId) throws CourierException
+ private Message tryToPickup(String messageId) throws CourierException, SQLException
{
+ int iParm = 1;
+ select4UpdateStatement().setString(iParm++, messageId);
+ select4UpdateStatement().setString(iParm++, State.Pending.getColumnValue());
+ while(_conn != null)
+ {
+ try
+ {
+ ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
+ while (RS.next())
+ {
+ Exception eBad = null;
+ try
+ {
+ Message result = Util.deserialize((Serializable)RS.getObject(1));
+ if (_postDelete)
+ deleteMsg(messageId);
+ else
+ changeStatus(messageId,State.Done);
+ return result;
+ }
+ catch (ClassCastException e){ eBad = e;}
+ catch (SAXParseException e){ eBad = e;}
+ catch (Exception e) { throw new CourierException(e); }
+ if (null!=eBad)
+ {
+ if (_errorDelete)
+ deleteMsg(messageId);
+ else
+ changeStatus(messageId, State.Error);
+ continue;
+ }
+ }
+ return null;
+ }
+ catch (SQLException e) { throw new CourierException(e); }
+ catch (Exception e) { jdbcConnectRetry(e); }
+ }
return null;
} //________________________________
+
+ private void deleteMsg(String messageId)
+ throws Exception
+ {
+ int iParm = 1;
+ deleteStatement().setString(iParm++, messageId);
+ _conn.execUpdWait(deleteStatement(), 3);
+ _conn.commit();
+
+ }
+
+ private void changeStatus(String messageId, State to)
+ throws Exception
+ {
+ int iParm = 1;
+ updateStatusStatement().setString(iParm++, to.getColumnValue());
+ updateStatusStatement().setString(iParm++, messageId);
+ _conn.execUpdWait(updateStatusStatement(),3);
+ _conn.commit();
+
+ }
+ private ResultSet getRowList() throws CourierException
+ {
+ if (null==_conn)
+ try { _conn = getConn();}
+ catch (Exception e) {throw new CourierException(e); }
+ while(_conn != null)
+ {
+ try { return _conn.execQueryWait(listStatement(), 3); }
+ catch (Exception e) {jdbcConnectRetry(e); }
+ }
+ return null;
+
+ } //_______________________________
+
private void jdbcConnectRetry(Exception exc)
{
+ _logger.error("DB problem, will try to reconnect",exc);
if (null!=_conn)
_conn.release();
_conn = null;
@@ -332,6 +402,7 @@
protected long _sleepForRetries = 3000; // milliseconds
+ protected boolean _postDelete ,_errorDelete;
protected boolean _isReceiver;
protected JDBCEpr _epr;
protected JdbcCleanConn _conn;
@@ -340,6 +411,7 @@
protected PreparedStatement _prepUpdateStatus;
protected PreparedStatement _prepInsert;
protected PreparedStatement _prepDelete;
+
protected static Logger _logger = Logger.getLogger(SqlTableCourier.class);
}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java 2006-12-18 22:52:58 UTC (rev 8393)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java 2006-12-19 01:53:49 UTC (rev 8394)
@@ -55,6 +55,8 @@
public static final String STATUS_COLUMN_TAG = "status_column";
public static final String DATA_COLUMN_TAG = "message_column";
public static final String TIMESTAMP_COLUMN_TAG = "insert_timestamp_column";
+ public static final String POST_DEL_TAG = FileEpr.POST_DEL_TAG;
+ public static final String ERROR_DEL_TAG = FileEpr.ERROR_DEL_TAG;
public JDBCEpr (EPR epr)
{
@@ -68,6 +70,20 @@
setSQL(sql);
}
+ public JDBCEpr (String url) throws URISyntaxException
+ {
+ this(url,true,true);
+ }
+
+ public JDBCEpr (String url, boolean postDelete, boolean errorDelete) throws URISyntaxException
+ {
+ super(new URI(url));
+ if (postDelete)
+ getAddr().addExtension(POST_DEL_TAG,Boolean.toString(postDelete));
+ if (errorDelete)
+ getAddr().addExtension(ERROR_DEL_TAG,Boolean.toString(errorDelete));
+ }
+
/**
* Set the URL for this endpoint.
*
@@ -352,8 +368,17 @@
return getAddr().getExtensionValue(TIMESTAMP_COLUMN_TAG);
}
-
+ public final String getPostDelete() throws URISyntaxException
+ {
+ return getAddr().getExtensionValue(POST_DEL_TAG);
+ }
+
+ public final String getErrorDelete() throws URISyntaxException
+ {
+ return getAddr().getExtensionValue(ERROR_DEL_TAG);
+ }
+
public static final URI type ()
{
return _type;
More information about the jboss-svn-commits
mailing list