[jboss-svn-commits] JBL Code SVN: r8391 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb/listeners rosetta/src/org/jboss/internal/soa/esb/couriers rosetta/src/org/jboss/soa/esb/addressing/eprs rosetta/src/org/jboss/soa/esb/helpers/persist

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Dec 18 17:34:49 EST 2006


Author: estebanschifman
Date: 2006-12-18 17:34:38 -0500 (Mon, 18 Dec 2006)
New Revision: 8391

Added:
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/SimpleDataSource.java
Log:
SQLCourier implementation - deliver() is operational - still some work to be done on pickup()

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-12-18 22:10:26 UTC (rev 8390)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-12-18 22:34:38 UTC (rev 8391)
@@ -25,11 +25,13 @@
  */
 package org.jboss.soa.esb.listeners;
 
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+
 public class ListenerTagNames 
 {
 	/** EPRs */
 	public static final String EPR_TAG						= "EPR";
-	public static final String URL_TAG						= "URL";
+	public static final String URL_TAG						= JDBCEpr.URL_TAG; // change only in one place
 	public static final String PROTOCOL_TAG					= "protocol";
 
 	/** Threading */

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-18 22:10:26 UTC (rev 8390)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-18 22:34:38 UTC (rev 8391)
@@ -1,7 +1,6 @@
 package org.jboss.soa.esb.listeners;
 
 import java.io.File;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Collection;
 
@@ -10,6 +9,7 @@
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.FTPEpr;
 import org.jboss.soa.esb.addressing.eprs.FileEpr;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.couriers.Courier;
 import org.jboss.soa.esb.couriers.CourierFactory;
@@ -85,22 +85,16 @@
     	throws ConfigurationException
     {
     	String urlString = tree.getAttribute(ListenerTagNames.URL_TAG);
-    	URL url = null;
-    	if (null!=urlString)
-    		try { url = new URL(urlString); }
-    		catch (MalformedURLException e) 
-    		{
-    			throw new ConfigurationException("Invalid URL syntax for EPR",e);
-    		}
-    		
-    	String protocol = (null==url) ? tree.getAttribute(ListenerTagNames.PROTOCOL_TAG)
-    			: url.getProtocol();
+    	String protocol = (null==urlString) 
+    		? tree.getAttribute(ListenerTagNames.PROTOCOL_TAG)
+    		: urlString.split(":")[0];
 
     	try
     	{
     		if ("jms"	.equals(protocol))  return jmsEprFromElement(tree);
     		if ("file"	.equals(protocol))  return fileEprFromElement(tree);
     		if ("ftp"	.equals(protocol))  return fileEprFromElement(tree);
+    		if ("jdbc"	.equals(protocol))  return jdbcEprFromElement(tree);
     	}
     	catch (Exception e) 
     	{ 
@@ -228,6 +222,35 @@
 	} //________________________________
 	
 	
+	public static JDBCEpr jdbcEprFromElement(ConfigTree tree)
+		throws Exception
+	{
+		String url = tree.getAttribute(JDBCEpr.URL_TAG,null);
+		if (!url.toLowerCase().startsWith("jdbc"))
+			throw new ConfigurationException("URL in "
+					+ListenerTagNames.URL_TAG+" must be a jdbc URL");
+
+		JDBCEpr epr = new JDBCEpr(url,"");	
+		epr.setDriver			(tree.getAttribute(JDBCEpr.DRIVER_TAG		,null));
+		epr.setUserName			(getAttrAndWarn(tree,JDBCEpr.USERNAME_TAG	,""));
+		epr.setPassword 		(getAttrAndWarn(tree,JDBCEpr.PASSWORD_TAG	,""));
+		epr.setTableName		(tree.getAttribute(JDBCEpr.TABLE_NAME_TAG 	,null));
+		epr.setMessageIdColumn	(getColName(tree,JDBCEpr.MESSAGE_ID_COLUMN_TAG));
+		epr.setStatusColumn		(getColName(tree,JDBCEpr.STATUS_COLUMN_TAG));
+		epr.setDataColumn		(getColName(tree,JDBCEpr.DATA_COLUMN_TAG));
+		epr.setTimestampColumn	(getColName(tree,JDBCEpr.TIMESTAMP_COLUMN_TAG));
+		
+		return epr;
+	} //________________________________
+	
+	private static final String s_Sfx = "_column";
+	private static String getColName(ConfigTree tree,String tag) throws ConfigurationException
+	{
+		String defaultColname = (tag.endsWith(s_Sfx)?tag.substring(0,tag.length()-s_Sfx.length()) : null);
+		return getAttrAndWarn(tree, tag, defaultColname);
+	} //________________________________
+
+
 	public static String getAttrAndWarn(ConfigTree tree, String tag,String defaultValue)
 		throws ConfigurationException
 	{

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2006-12-18 22:10:26 UTC (rev 8390)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2006-12-18 22:34:38 UTC (rev 8391)
@@ -0,0 +1,345 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.internal.soa.esb.couriers;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.Call;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.Util;
+
+public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier 
+{
+	/**
+	 * disable default constructor
+	 */
+	private SqlTableCourier() { }
+
+	/**
+	 * package protected constructor - Objects of Courier should only be instantiated by the Factory
+	 * @param epr
+	 */
+	SqlTableCourier(JDBCEpr epr) throws CourierException { this(epr,false); }
+
+	/**
+	 * package protected constructor - Objects of Courier should only be instantiated by the Factory
+	 * @param epr
+	 */
+	SqlTableCourier(JDBCEpr epr, boolean isReceiver) throws CourierException
+	{
+		_isReceiver = isReceiver;
+		_epr		= epr;
+		_sleepForRetries	= 3000;
+
+
+	} //________________________________
+
+	void cleanup() 
+	{
+		if (null != _conn)
+			try { _conn.release(); }
+			catch (Exception e) 
+			{ 
+				_logger.info("Unable to release connection",e);
+			}
+		
+    } //________________________________
+
+	/**
+	 * package the ESB message in a java.io.Serializable, and write it
+	 * @param message Message - the message to deliver 
+	 * @return boolean - the result of the delivery
+	 * @throws CourierException - if problems were encountered
+	 */
+	public boolean deliver(Message message) throws CourierException 
+	{
+		if (_isReceiver)
+			throw new CourierException("This is a read-only Courier");
+
+		if (null==message)
+			return false;
+		
+		
+		String msgId = null;
+		try
+		{
+			Call call = message.getHeader().getCall();
+			URI  uri  = (null==call) ? null : call.getMessageID();
+			msgId = (null==uri) ? null : uri.toString();
+			if (null==msgId)
+				throw new CourierException ("Message ID must not be null");
+		}
+		catch (URISyntaxException e) { throw new CourierException(e); }
+
+		if (null==_conn)
+			try {  _conn = getConn();}
+			catch (Exception e) {throw new CourierException(e); }
+
+		while(_conn != null)
+		{
+			try 
+			{
+				int iCol = 1;
+				PreparedStatement PS = insertStatement();
+				PS.setString(iCol++, msgId);
+				PS.setObject(iCol++, Util.serialize(message));
+				PS.setString(iCol++, State.Pending.getColumnValue());
+				PS.setLong	(iCol++, System.currentTimeMillis());
+
+				_conn.execUpdWait(PS, 3);
+				_conn.commit();
+				return true;
+			}
+			catch (SQLException e)	
+			{
+				if (null!=_conn)
+					try { _conn.rollback(); }
+					catch (Exception roll)
+					{
+						_logger.error(roll);
+					}
+				_logger.error("SQL error",e);
+				throw new CourierException(e);
+			}
+			catch (Exception e)		
+			{ 
+				jdbcConnectRetry(e);
+			}
+		}
+		return false;
+	} //________________________________
+	
+	private ResultSet getRowList() throws CourierException
+	{
+		if (null==_conn)
+			try {  _conn = getConn();}
+			catch (Exception e) {throw new CourierException(e); }
+		ResultSet RS = null;
+		return RS;
+		
+	} //_______________________________
+	
+	public Message pickup(long millis) throws CourierException 
+	{
+		Message result = null;
+		long limit = System.currentTimeMillis()
+				+ ((millis < 100) ? 100 : millis);
+		do
+		{
+			ResultSet RS = getRowList();
+			try
+			{
+				while (RS.next())
+				{
+					String messageId = RS.getString(1);
+					if (null==(result=tryToPickup(messageId)))
+						continue;
+					return result;
+				}
+				try { Thread.sleep(200); }
+				catch (InterruptedException e) { return null; }
+			}
+			catch (SQLException e)
+			{
+				_logger.error(e);
+				return null;
+			}
+
+		} while (System.currentTimeMillis() <= limit);
+		return null;
+    } //________________________________
+
+    private Message tryToPickup(String messageId) throws CourierException
+    {
+    	return null;
+    } //________________________________
+
+    private void jdbcConnectRetry(Exception exc)
+    {
+    	if (null!=_conn)
+    		_conn.release();
+    	_conn	= null;
+
+    	_prepDelete = 
+    	_prepGetList = 
+    	_prepInsert = 
+    	_prepSel4Upd = 
+    	_prepUpdateStatus 
+    		= null;
+    	for (int i1=0; i1<3; i1++)
+    	{
+    		try { _conn = getConn(); }
+    		catch (Exception e)
+    		{
+    			try {Thread.sleep(_sleepForRetries); }
+    			catch (InterruptedException eInt) { return; }
+    		}
+    	}
+    } //________________________________
+    
+    private JdbcCleanConn getConn() throws Exception
+    {
+    	if (null==_conn)
+    	{
+    		SimpleDataSource DS = new SimpleDataSource
+    			(_epr.getDriver(),_epr.getURL(),_epr.getUserName(), _epr.getPassword());
+    		_conn = new JdbcCleanConn(DS);
+    	}
+    	return _conn;
+    } //________________________________
+    
+    protected PreparedStatement listStatement()
+    {
+    	if (null==_prepGetList)
+    		
+	    	try
+	    	{
+		        String[] columns =	{_epr.getMessageIdColumn(),_epr.getTimestampColumn()};
+		
+		    	StringBuilder sb = new StringBuilder ("select");
+		    	int i1 = 0;
+		    	for (String col:columns)
+		    		sb.append((i1++<1)?" ":",").append(col);
+		    	sb.append(" from ").append(_epr.getTableName());
+		    	sb.append(" where ").append(_epr.getStatusColumn())
+		    		.append("='").append(State.Pending.getColumnValue()).append("'")
+		    		.append(" order by 2");
+		    	;
+		    	_prepGetList = getConn().prepareStatement(sb.toString());
+	    	}
+	    	catch (Exception e)
+	    	{
+	    		_logger.error("Unable to prepare SQL statement",e);
+	    		return null;
+	    	}
+	    return _prepGetList;
+    } //________________________________
+
+    protected PreparedStatement select4UpdateStatement()
+    {
+    	if (null==_prepSel4Upd)
+	    	try
+	    	{
+		    	StringBuilder sb = new StringBuilder ("select ")
+					.append(_epr.getDataColumn()).append(" from ") .append(_epr.getTableName())
+					.append(" where ").append(_epr.getMessageIdColumn()).append("=?")
+					.append(" and ")  .append(_epr.getStatusColumn())	.append("=?")
+		    		.append(" for update")
+		    	;
+		    	_prepSel4Upd = getConn().prepareStatement(sb.toString()); 
+	    	}
+	    	catch (Exception e)
+	    	{
+	    		_logger.error(e);
+	    		return null;
+	    	}
+    	return _prepSel4Upd;
+    } //________________________________
+
+    protected PreparedStatement updateStatusStatement()
+    {
+    	if (null==_prepUpdateStatus)
+	    	try
+	    	{
+		    	StringBuilder sb = new StringBuilder ("update ").append(_epr.getTableName())
+		    		.append(" set ").append(_epr.getStatusColumn())		.append("= ?")
+		    		.append(" where ").append(_epr.getMessageIdColumn()).append("=?")
+		    	;
+		    	_prepUpdateStatus = getConn().prepareStatement(sb.toString()); 
+			}
+			catch (Exception e)
+			{
+				_logger.error(e);
+				return null;
+			}
+		return _prepUpdateStatus;
+    } //________________________________
+
+    protected PreparedStatement insertStatement()
+    {
+    	if (null==_prepInsert)
+	    	try
+	    	{
+		        String[] columns =	{_epr.getMessageIdColumn()	,_epr.getDataColumn()
+		        					,_epr.getStatusColumn()		,_epr.getTimestampColumn()};
+		    	
+		        StringBuilder sb = new StringBuilder ("insert into ").append(_epr.getTableName())
+		        	.append("(");
+		    	int i1 = 0;
+		    	for (String col:columns)
+		    		sb.append((i1++<1)?" ":",").append(col);
+		    	sb.append(") values (?,?,?,?)");
+		    	_prepInsert = getConn().prepareStatement(sb.toString());
+			}
+			catch (Exception e)
+			{
+				_logger.error(e);
+				return null;
+			}
+		return _prepInsert;
+    } //________________________________
+
+    protected PreparedStatement deleteStatement()
+    {
+    	if (null==_prepDelete)
+	    	try
+	    	{
+		        StringBuilder sb = new StringBuilder ("delete from ").append(_epr.getTableName())
+		        	.append(" where ").append(_epr.getMessageIdColumn()).append(" =?");
+		    	_prepDelete = getConn().prepareStatement(sb.toString());
+			}
+			catch (Exception e)
+			{
+				_logger.error(e);
+				return null;
+			}
+		return _prepDelete;
+    } //________________________________
+
+    protected enum State
+    {	Pending, WorkInProgress, Done, Error;
+    	String	getColumnValue() {return toString().substring(0,1); }
+    };
+
+    protected long 				_sleepForRetries = 3000;   //  milliseconds
+
+	protected boolean			_isReceiver;	  
+    protected JDBCEpr			_epr;
+    protected JdbcCleanConn		_conn;
+    protected PreparedStatement _prepGetList;;
+    protected PreparedStatement _prepSel4Upd;
+    protected PreparedStatement _prepUpdateStatus;
+    protected PreparedStatement	_prepInsert;
+    protected PreparedStatement	_prepDelete;
+
+    protected static Logger		_logger = Logger.getLogger(SqlTableCourier.class);
+}

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java	2006-12-18 22:10:26 UTC (rev 8390)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java	2006-12-18 22:34:38 UTC (rev 8391)
@@ -27,6 +27,7 @@
 
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.FileEpr;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
@@ -81,6 +82,8 @@
 			return new JmsCourier((JMSEpr)toEPR);
 		if (toEPR instanceof FileEpr)
 			return new FileCourier((FileEpr)toEPR);
+		if (toEPR instanceof JDBCEpr)
+			return new SqlTableCourier((JDBCEpr)toEPR);
 		
 		throw new CourierException ("Deliver courier for "+toEPR.getClass().getSimpleName()+" not supported yet");
 	}
@@ -93,6 +96,9 @@
 			return new JmsCourier((JMSEpr)replyToEPR,true);
 		if (replyToEPR instanceof FileEpr)
 			return new FileCourier((FileEpr)replyToEPR,true);
+		if (replyToEPR instanceof JDBCEpr)
+			return new SqlTableCourier((JDBCEpr)replyToEPR,true);
+
 		throw new CourierException ("Couriers for "+replyToEPR.getClass().getSimpleName()+" not supported yet");
 	}
 	

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java	2006-12-18 22:10:26 UTC (rev 8390)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/addressing/eprs/JDBCEpr.java	2006-12-18 22:34:38 UTC (rev 8391)
@@ -49,6 +49,12 @@
 	public static final String SQL_TAG = "sql";
 	public static final String DRIVER_TAG = "driver";
 	public static final String URL_TAG = "URL";
+
+	public static final String TABLE_NAME_TAG 			= "tablename";
+	public static final String MESSAGE_ID_COLUMN_TAG 	= "message_id_column";
+	public static final String STATUS_COLUMN_TAG 		= "status_column";
+	public static final String DATA_COLUMN_TAG 			= "message_column";
+	public static final String TIMESTAMP_COLUMN_TAG 	= "insert_timestamp_column";
 	
 	public JDBCEpr (EPR epr)
 	{
@@ -201,6 +207,153 @@
 		return getAddr().getExtensionValue(DRIVER_TAG);
 	}
 	
+	/**
+	 * Set the tablename that is used by this EPR.
+	 * 
+	 * @param tableName the table name.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final void setTableName (String tableName) throws URISyntaxException
+	{
+		if (tableName == null)
+			throw new IllegalArgumentException();
+		
+		if (tableNameSet)
+			throw new IllegalStateException("Table name already set.");
+		
+		getAddr().addExtension(TABLE_NAME_TAG, tableName);
+		tableNameSet = true;
+	}
+	
+	/**
+	 * @return the table name used by this EPR.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final String getTableName () throws URISyntaxException
+	{
+		return getAddr().getExtensionValue(TABLE_NAME_TAG);
+	}
+	
+	/**
+	 * Set the message id column name that is used by this EPR.
+	 * 
+	 * @param columnName the column name for the message ID.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final void setMessageIdColumn (String columnName) throws URISyntaxException
+	{
+		if (columnName == null)
+			throw new IllegalArgumentException();
+		
+		if (messageIdSet)
+			throw new IllegalStateException("Message Id column already set.");
+		
+		getAddr().addExtension(MESSAGE_ID_COLUMN_TAG, columnName);
+		messageIdSet = true;
+	}
+	
+	/**
+	 * @return the message id column used by this EPR.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final String getMessageIdColumn () throws URISyntaxException
+	{
+		return getAddr().getExtensionValue(MESSAGE_ID_COLUMN_TAG);
+	}
+	
+	/**
+	 * Set the status column that is used by this EPR.
+	 * 
+	 * @param statusColumn the status column.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final void setStatusColumn (String statusColumn) throws URISyntaxException
+	{
+		if (statusColumn == null)
+			throw new IllegalArgumentException();
+		
+		if (statusSet)
+			throw new IllegalStateException("Status column already set.");
+		
+		getAddr().addExtension(STATUS_COLUMN_TAG, statusColumn);
+		statusSet = true;
+	}
+	
+	/**
+	 * @return the status column name used by this EPR.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final String getStatusColumn () throws URISyntaxException
+	{
+		return getAddr().getExtensionValue(STATUS_COLUMN_TAG);
+	}
+	
+	/**
+	 * Set the column that is used by this EPR to store message data.
+	 * 
+	 * @param dataColumn the column name.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final void setDataColumn (String dataColumn) throws URISyntaxException
+	{
+		if (dataColumn == null)
+			throw new IllegalArgumentException();
+		
+		if (dataSet)
+			throw new IllegalStateException("Data column already set.");
+		
+		getAddr().addExtension(DATA_COLUMN_TAG, dataColumn);
+		dataSet = true;
+	}
+	
+	/**
+	 * @return the data column name used by this EPR.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final String getDataColumn () throws URISyntaxException
+	{
+		return getAddr().getExtensionValue(DATA_COLUMN_TAG);
+	}
+	
+	/**
+	 * Set the column that is used by this EPR to store timestamp when the message was inserted.
+	 * 
+	 * @param timeColumn the column name.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final void setTimestampColumn (String timeColumn) throws URISyntaxException
+	{
+		if (timeColumn == null)
+			throw new IllegalArgumentException();
+		
+		if (timestampSet)
+			throw new IllegalStateException("Timestamp column already set.");
+		
+		getAddr().addExtension(TIMESTAMP_COLUMN_TAG, timeColumn);
+		timestampSet = true;
+	}
+	
+	/**
+	 * @return the timestamp column name used by this EPR.
+	 * @throws URISyntaxException thrown if this EPR is malformed.
+	 */
+	
+	public final String getTimestampColumn () throws URISyntaxException
+	{
+		return getAddr().getExtensionValue(TIMESTAMP_COLUMN_TAG);
+	}
+	
+
+	
 	public static final URI type ()
 	{
 	    return _type;
@@ -210,6 +363,11 @@
 	private boolean passwordSet = false;
 	private boolean sqlSet = false;
 	private boolean driverSet = false;
+	private boolean tableNameSet = false;
+	private boolean messageIdSet = false;
+	private boolean statusSet = false;
+	private boolean dataSet = false;
+	private boolean timestampSet = false;
 	
 	private static URI _type;
 	

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/SimpleDataSource.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/SimpleDataSource.java	2006-12-18 22:10:26 UTC (rev 8390)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/persist/SimpleDataSource.java	2006-12-18 22:34:38 UTC (rev 8391)
@@ -27,6 +27,7 @@
 
 import javax.sql.DataSource;
 
+import org.apache.log4j.Logger;
 import org.jboss.soa.esb.helpers.ConfigTree;
 
 /**
@@ -46,21 +47,17 @@
 	private PrintWriter m_oPW = new PrintWriter(System.out);
 
 	private int m_iTO = 10;
-
 	private String m_sUrl, m_sUsr, m_sPwd;
-
 	private Connection m_oConn;
-
 	public static final String DRIVER = "driver-class";
-
 	public static final String URL = "connection-url";
-
 	public static final String USER = "user-name";
-
 	public static final String PASSWORD = "password";
+	
+	private static final Logger _logger = Logger.getLogger(SimpleDataSource.class);
 
-	private SimpleDataSource() {
-	}
+	// Disable default constructor
+	private SimpleDataSource() { }
 
 	/**
 	 * Obtain a DataSource by providing connection parameters. ConfigTree
@@ -118,7 +115,9 @@
 		m_sPwd = password;
 		try {
 			m_oConn = DriverManager.getConnection(m_sUrl, m_sUsr, m_sPwd);
-		} catch (Exception e) {
+		} catch (Exception e) 
+		{
+			_logger.error("Can't obtain datasource",e);
 			m_oConn = null;
 		}
 		return m_oConn;




More information about the jboss-svn-commits mailing list