[jboss-svn-commits] JBL Code SVN: r8394 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb/listeners rosetta/src/org/jboss/internal/soa/esb/couriers rosetta/src/org/jboss/soa/esb/addressing/eprs

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Dec 18 20:53:58 EST 2006


Author: estebanschifman
Date: 2006-12-18 20:53:49 -0500 (Mon, 18 Dec 2006)
New Revision: 8394

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
Log:
deliver() and pickup() are now fully functional for SQLCourier

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-18 22:52:58 UTC (rev 8393)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-19 01:53:49 UTC (rev 8394)
@@ -230,7 +230,9 @@
 			throw new ConfigurationException("URL in "
 					+ListenerTagNames.URL_TAG+" must be a jdbc URL");
 
-		JDBCEpr epr = new JDBCEpr(url,"");	
+		boolean bPostDel 	= Boolean.valueOf(tree.getAttribute(JDBCEpr.POST_DEL_TAG,"true"));
+		boolean bErrorDel 	= Boolean.valueOf(tree.getAttribute(JDBCEpr.ERROR_DEL_TAG,"true"));
+		JDBCEpr epr = new JDBCEpr(url,bPostDel,bErrorDel);	
 		epr.setDriver			(tree.getAttribute(JDBCEpr.DRIVER_TAG		,null));
 		epr.setUserName			(getAttrAndWarn(tree,JDBCEpr.USERNAME_TAG	,""));
 		epr.setPassword 		(getAttrAndWarn(tree,JDBCEpr.PASSWORD_TAG	,""));

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2006-12-18 22:52:58 UTC (rev 8393)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2006-12-19 01:53:49 UTC (rev 8394)
@@ -22,6 +22,7 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
+import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.sql.PreparedStatement;
@@ -36,6 +37,7 @@
 import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXParseException;
 
 public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier 
 {
@@ -59,8 +61,13 @@
 		_isReceiver = isReceiver;
 		_epr		= epr;
 		_sleepForRetries	= 3000;
+		try
+		{
+			_postDelete = Boolean.TRUE.equals(Boolean.valueOf(epr.getPostDelete()));
+			_errorDelete= Boolean.TRUE.equals(Boolean.valueOf(epr.getErrorDelete()));
+		}
+		catch (URISyntaxException e) { throw new CourierException(e); }
 
-
 	} //________________________________
 
 	void cleanup() 
@@ -138,16 +145,6 @@
 		return false;
 	} //________________________________
 	
-	private ResultSet getRowList() throws CourierException
-	{
-		if (null==_conn)
-			try {  _conn = getConn();}
-			catch (Exception e) {throw new CourierException(e); }
-		ResultSet RS = null;
-		return RS;
-		
-	} //_______________________________
-	
 	public Message pickup(long millis) throws CourierException 
 	{
 		Message result = null;
@@ -158,7 +155,7 @@
 			ResultSet RS = getRowList();
 			try
 			{
-				while (RS.next())
+				while (null!=RS && RS.next())
 				{
 					String messageId = RS.getString(1);
 					if (null==(result=tryToPickup(messageId)))
@@ -178,13 +175,86 @@
 		return null;
     } //________________________________
 
-    private Message tryToPickup(String messageId) throws CourierException
+    private Message tryToPickup(String messageId) throws CourierException, SQLException
     {
+    	int iParm = 1;
+    	select4UpdateStatement().setString(iParm++, messageId);
+    	select4UpdateStatement().setString(iParm++, State.Pending.getColumnValue());
+		while(_conn != null)
+		{
+			try 
+			{ 
+				ResultSet RS = _conn.execQueryWait(select4UpdateStatement(), 3);
+				while (RS.next())
+				{
+					Exception eBad = null;
+					try
+					{
+						Message result = Util.deserialize((Serializable)RS.getObject(1));
+						if (_postDelete)
+							deleteMsg(messageId);
+						else
+							changeStatus(messageId,State.Done);
+						return result;
+					}
+					catch (ClassCastException e){ eBad = e;}
+					catch (SAXParseException  e){ eBad = e;}
+					catch (Exception e) { throw new CourierException(e); }
+					if (null!=eBad)
+					{
+						if (_errorDelete)
+							deleteMsg(messageId);
+						else
+							changeStatus(messageId, State.Error);
+						continue;
+					}
+				}
+				return null;
+			}
+			catch (SQLException e)	{ throw new CourierException(e); }
+			catch (Exception e)  	{ jdbcConnectRetry(e); }
+		}
     	return null;
     } //________________________________
+    
+    private void deleteMsg(String messageId)
+		throws Exception
+	{
+		int iParm = 1;
+		deleteStatement().setString(iParm++, messageId);
+		_conn.execUpdWait(deleteStatement(), 3);
+		_conn.commit();
+		
+	}
+	
+    private void changeStatus(String messageId, State to)
+    	throws Exception
+    {
+    	int iParm = 1;
+		updateStatusStatement().setString(iParm++, to.getColumnValue());
+		updateStatusStatement().setString(iParm++, messageId);
+		_conn.execUpdWait(updateStatusStatement(),3);
+		_conn.commit();
+    	
+    }
 
+	private ResultSet getRowList() throws CourierException
+	{
+		if (null==_conn)
+			try {  _conn = getConn();}
+			catch (Exception e) {throw new CourierException(e); }
+		while(_conn != null)
+		{
+			try { return _conn.execQueryWait(listStatement(), 3); }
+			catch (Exception e)  {jdbcConnectRetry(e); }
+		}
+		return null;
+		
+	} //_______________________________
+	
     private void jdbcConnectRetry(Exception exc)
     {
+    	_logger.error("DB problem, will try to reconnect",exc);
     	if (null!=_conn)
     		_conn.release();
     	_conn	= null;
@@ -332,6 +402,7 @@
 
     protected long 				_sleepForRetries = 3000;   //  milliseconds
 
+    protected boolean			_postDelete	,_errorDelete;
 	protected boolean			_isReceiver;	  
     protected JDBCEpr			_epr;
     protected JdbcCleanConn		_conn;
@@ -340,6 +411,7 @@
     protected PreparedStatement _prepUpdateStatus;
     protected PreparedStatement	_prepInsert;
     protected PreparedStatement	_prepDelete;
+    
 
     protected static Logger		_logger = Logger.getLogger(SqlTableCourier.class);
 }

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java	2006-12-18 22:52:58 UTC (rev 8393)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java	2006-12-19 01:53:49 UTC (rev 8394)
@@ -55,6 +55,8 @@
 	public static final String STATUS_COLUMN_TAG 		= "status_column";
 	public static final String DATA_COLUMN_TAG 			= "message_column";
 	public static final String TIMESTAMP_COLUMN_TAG 	= "insert_timestamp_column";
+	public static final String POST_DEL_TAG				= FileEpr.POST_DEL_TAG;
+	public static final String ERROR_DEL_TAG			= FileEpr.ERROR_DEL_TAG;
 	
 	public JDBCEpr (EPR epr)
 	{
@@ -68,6 +70,20 @@
 		setSQL(sql);
 	}
 
+	public JDBCEpr (String url) throws URISyntaxException
+	{
+		this(url,true,true);
+	}
+
+	public JDBCEpr (String url, boolean postDelete, boolean errorDelete) throws URISyntaxException
+	{
+		super(new URI(url));
+		if (postDelete)
+			getAddr().addExtension(POST_DEL_TAG,Boolean.toString(postDelete));
+		if (errorDelete)
+			getAddr().addExtension(ERROR_DEL_TAG,Boolean.toString(errorDelete));
+	}
+
 	/**
 	 * Set the URL for this endpoint.
 	 * 
@@ -352,8 +368,17 @@
 		return getAddr().getExtensionValue(TIMESTAMP_COLUMN_TAG);
 	}
 	
-
 	
+	public final String getPostDelete() throws URISyntaxException
+	{
+	    return getAddr().getExtensionValue(POST_DEL_TAG);
+	}
+	
+	public final String getErrorDelete() throws URISyntaxException
+	{
+	    return getAddr().getExtensionValue(ERROR_DEL_TAG);
+	}
+	
 	public static final URI type ()
 	{
 	    return _type;




More information about the jboss-svn-commits mailing list