[jboss-svn-commits] JBL Code SVN: r7493 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Nov 8 18:08:17 EST 2006


Author: estebanschifman
Date: 2006-11-08 18:08:14 -0500 (Wed, 08 Nov 2006)
New Revision: 7493

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
Log:
new SqlTableGatewayListener and minor change to FileGatewayListener

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2006-11-08 22:57:47 UTC (rev 7492)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2006-11-08 23:08:14 UTC (rev 7493)
@@ -75,7 +75,7 @@
 			//  only sleep in between
 			if (bSleep)
 				try { Thread.sleep(_sleepBetweenPolls); }
-				catch (InterruptedException e) { return; }
+				catch (InterruptedException e) { break; }
 			else
 				bSleep = true;
 

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java	2006-11-08 22:57:47 UTC (rev 7492)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java	2006-11-08 23:08:14 UTC (rev 7493)
@@ -0,0 +1,583 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+public class SqlTableGatewayListener implements Runnable
+{
+
+    public SqlTableGatewayListener(GatewayListenerController commandListener, ConfigTree config) 
+    	throws Exception 
+    {
+    	_config		= config;
+    	_controller	= commandListener;
+    	_sleepBetweenPolls = 10000;			//  milliseconds
+        checkMyParms();
+    } // __________________________________
+
+	public void run() 
+	{
+		if (null!=_serviceName)
+			try {	_controller.register(_config,_myEpr); }
+			catch (RegistryException e1) 
+				{	_logger.warn("unable to register service",e1); }
+
+		boolean bSleep = false;
+		while (_controller.continueLooping()) 
+        {
+			//  only sleep in between - not the first time
+			if (bSleep)
+				try { Thread.sleep(_sleepBetweenPolls); }
+				catch (InterruptedException e) { break; }
+			else
+				bSleep = true;
+
+			for (Map<String,Object> row : pollForCandidates())
+            {
+				_currentRow	= row;
+        		// Try to mark as 'in process' - if unsuccessful, somebody else got it first
+        		if (! changeStatusToWorking())
+        			continue;
+
+        		Throwable thrown = null;
+        		String text = null;
+	            try
+	            {
+	            	Object obj = _processMethod.invoke(_composer,new Object[] {_currentRow} );
+	        		if (null==obj)
+	        		{
+	        			_logger.warn("Action class method <"+_processMethod.getName()+"> returned a null object");
+	        			continue;
+	        		}
+	        		_courier.deliver((org.jboss.soa.esb.message.Message)obj);
+	            }
+
+		            catch (InvocationTargetException e)	
+		            {
+		            	thrown = e;
+		            	text = "Problems invoking method <"+_processMethod.getName()+">";
+		            	
+		            }
+		            catch (IllegalAccessException e)	
+		            {	
+		            	thrown = e;
+		            	text = "Problems invoking method <"+_processMethod.getName()+">";
+		            }
+	        		catch (ClassCastException e)
+	        		{
+	        			thrown = e;
+	        			text = "Action class method <"+_processMethod.getName()+"> returned a non Message object";
+	        		}
+	        		catch (CourierException e)
+	        		{
+	        			thrown = e;
+	        			text = "Courier <"+_courier.getClass().getName()+".deliver(Message) FAILED";
+	        		}
+        		
+        		if (null==thrown)
+        		{
+        			if (_deleteAfterOK)
+        				deleteCurrentRow();
+        			else
+        				changeStatusToDone();
+        		}
+        		else
+        		{
+        			thrown.printStackTrace();
+        			_logger.error(text,thrown);
+        			changeStatusToError();
+        		}
+            }
+        }
+        
+		if (null!=_serviceName)
+			try { _controller.unRegister(_serviceCategory, _serviceName,_myEpr); }
+			catch (RegistryException e1){	_logger.warn("unable to unRegister service",e1); }
+
+		if (null!=_dbConn)
+			_dbConn.release();
+    } // ________________________________
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
+     */
+    protected void checkMyParms() throws Exception 
+    {
+        // Third arg is null - Exception will be thrown if attribute is not found
+    	_targetServiceCategory	= _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+    	_targetServiceName	= _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+    	_targetEpr		= _controller.getEprByName(_targetServiceName);
+    	if (null==_targetEpr)
+        	throw new ConfigurationException("EPR <"+_targetServiceName+"> not found in registry");
+        _courier		= CourierFactory.getCourier(_targetEpr);
+
+        // Polling interval
+        String sAux = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+        if (! Util.isNullString(sAux))
+	        try { _sleepBetweenPolls = 1000 * Long.parseLong(sAux); }
+	        catch (NumberFormatException e)
+	        	{ _logger.warn("Invalid poll latency - keeping default of "+(_sleepBetweenPolls/1000)); }
+        
+        resolveComposerClass();
+
+        _driver	= _controller.obtainAtt(_config, JDBCEpr.DRIVER_TAG	, null);
+        _url	= _controller.obtainAtt(_config, JDBCEpr.URL_TAG	, null);
+        _user	= _controller.obtainAtt(_config, JDBCEpr.USERNAME_TAG	, null);
+        _password = _controller.obtainAtt(_config, JDBCEpr.PASSWORD_TAG	, "");
+
+        _tableName = _controller.obtainAtt(_config, ListenerTagNames.SQL_TABLE_NAME_TAG,null);
+    	_selectFields = _controller.obtainAtt(_config, ListenerTagNames.SQL_SELECT_FIELDS_TAG,null);
+    	_keyFields = _controller.obtainAtt(_config, ListenerTagNames.SQL_KEY_FIELDS_TAG,null);
+    	_inProcessField = _controller.obtainAtt(_config, ListenerTagNames.SQL_IN_PROCESS_FIELD_TAG,null);
+	  
+    	_where	 = _controller.obtainAtt(_config, ListenerTagNames.SQL_WHERE_CONDITION_TAG,"");
+    	_orderBy = _controller.obtainAtt(_config, ListenerTagNames.SQL_ORDER_BY_TAG,"");
+    	_inProcessVals = _controller.obtainAtt(_config, ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG,DEFAULT_IN_PROCESS_STATES);
+
+    	if (_inProcessVals.length()<4)
+		  throw new Exception("Parameter <"+ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG
+				  +"> must be at least 4 characters long (PWED)");
+
+    	_columns = _selectFields.split(",");
+    	if (_columns.length < 1)
+			throw new Exception("Empty list of select fields");
+    	
+    	Set<String>colSet = new HashSet<String>(Arrays.asList(_columns));
+    	_keys	 = _keyFields.split(",");
+    	if (_keys.length < 1)
+			throw new Exception("Empty list of keyFields");
+    	for(String currKey : _keys)
+    	{
+    		if (colSet.contains(currKey))
+    			continue;
+    		else
+    		{	StringBuilder sb = new StringBuilder()
+    				.append("All key field names in the <").append(ListenerTagNames.SQL_KEY_FIELDS_TAG)
+    				.append("> attribute must be in the ").append(ListenerTagNames.SQL_SELECT_FIELDS_TAG)
+    				.append("list - '").append(currKey).append("' is not there");
+    				;
+    			throw new ConfigurationException(sb.toString());
+    		}
+    	}
+    	prepareStatements();
+    } //________________________________
+
+    protected void prepareStatements() throws ConfigurationException
+    {
+    	try
+    	{
+    		_PSscan		= getDbConn().prepareStatement(scanStatement());
+    		_PSsel4U	= getDbConn().prepareStatement(selectForUpdStatement());
+    		_PSupdate	= getDbConn().prepareStatement(updateStatement());
+    		_PSdeleteRow= getDbConn().prepareStatement(deleteStatement());
+    		return;
+    	}
+    	catch (Exception e)
+    		{throw new ConfigurationException(e); }
+    } //________________________________
+
+    
+    protected void resolveComposerClass() throws Exception
+    {
+        // Look for first "action" element - only first one will be used
+        String tagName = ListenerTagNames.ACTION_ELEMENT_TAG;
+        ConfigTree actionElement = _config.getFirstChild(tagName);
+        String sProcessMethod =  null;
+        if (null!=actionElement)
+        {	// class attribute
+            _composerName	= _controller.obtainAtt(actionElement,ListenerTagNames.ACTION_CLASS_TAG,null);
+            _composerClass = Class.forName(_composerName);
+        	Constructor oConst = _composerClass.getConstructor(new Class[] {ConfigTree.class});
+        	_composer= oConst.newInstance(_config);
+        	tagName	= ListenerTagNames.PROCESS_METHOD_TAG;
+        	sProcessMethod = _controller.obtainAtt(_config,tagName,tagName);
+        }
+        else
+        {
+        	_composerName = PackageRowContents.class.getName();
+        	_composerClass= PackageRowContents.class;
+        	_composer	= new PackageRowContents();
+        	sProcessMethod = "process";
+        }
+
+    	_processMethod = _composerClass.getMethod(sProcessMethod,new Class[] {Object.class});
+    } //________________________________
+
+	protected List<Map<String,Object>> pollForCandidates() 
+	{
+		JdbcCleanConn	oConn	 = null;
+		List<Map<String,Object>> oResults = new ArrayList<Map<String,Object>>();
+		try
+		{
+			oConn = getDbConn();
+			String sScan = scanStatement();
+
+			PreparedStatement PS = oConn.prepareStatement(sScan);
+			ResultSet RS = oConn.execQueryWait(PS,1);
+			while (RS.next()) 
+			{	
+                Map<String,Object> row = new HashMap<String,Object>();
+				int iCurr = 0;
+
+                for (String sColName : _columns)
+                    row.put(sColName,RS.getObject(++iCurr));
+
+				oResults.add(row);
+			}
+		}
+		catch (Exception e)
+		{
+			_logger.warn("Some triggers might not have been returned",e);
+		}
+        _logger.info("Returning " + oResults.size() + " rows.");
+		return oResults;
+	} //________________________________
+
+	/**
+	 * Obtain a new database connection with parameter info
+	 * @return A new connection
+	 * @throws Exception - if problems are encountered
+	 */
+	protected JdbcCleanConn getDbConn() throws Exception
+	{
+		if (null==_dbConn)
+		{
+			DataSource oDS = new SimpleDataSource (_driver,_url,_user, _password);
+			_dbConn	= new JdbcCleanConn(oDS);
+		}
+		return _dbConn;
+	} //________________________________
+	
+	/**
+	 * Assemble the SQL statement to scan (poll) the table
+	 * @return - The resulting SQL statement
+	 */
+	protected String scanStatement()
+	{	
+		StringBuilder sb = new StringBuilder ()
+			.append("select ").append(_selectFields)
+			.append(" from ") .append(_tableName);
+
+		boolean bWhere =  ! Util.isNullString(_where);
+		if (bWhere)
+			sb.append(" where ").append(_where);
+		sb.append((bWhere) ? " and " : " where ");
+
+		String sLike = _inProcessVals.substring(0,1).toUpperCase();
+		sb.append(" upper(").append(_inProcessField)
+			.append(") like '").append(sLike).append("%'");
+		
+		if (! Util.isNullString(_orderBy))
+			sb.append(" order by ").append(_orderBy);
+		return sb.toString();
+	} //________________________________
+
+	/**
+	 * Assemble the SQL statement to update the field
+	 * in the "inProcessField" parameter
+	 *  
+	 * in the table row uniquely identified by the list of fields 
+	 * in the "keyFields" parameter
+	 * 
+	 * @return - The resulting SQL statement
+	 */
+	protected String updateStatement()
+	{	
+		StringBuilder sb = new StringBuilder ()
+			.append("update ").append(_tableName)
+			.append(" set ")  .append(_inProcessField).append(" = ? where ")
+		;
+		int iCurr = 0;
+		for(String sCurr : _keys)
+		{	if (iCurr++ > 0)
+				sb.append(" and ");
+			sb.append(sCurr).append(" = ?");
+		}		
+		return sb.toString();
+	} //________________________________
+
+	/**
+	 * Assemble the SQL "select for update" statement
+	 * for the "inProcessField" parameter
+	 *  
+	 * in the table row uniquely identified by the list of fields 
+	 * in the "keyFields" parameter
+	 * 
+	 * @return - The resulting SQL statement
+	 */
+	protected String selectForUpdStatement()
+	{	
+		StringBuilder sb = new StringBuilder ()
+			.append("select ").append(_inProcessField)
+			.append(" from ") .append(_tableName)
+			.append(" where ")
+		;
+		int iCurr = 0;
+		for(String sCurr : _keys)
+		{	if (iCurr++ > 0)
+				sb.append(" and ");
+			sb.append(sCurr).append(" = ?");
+		}		
+		return sb.append(" for update").toString();
+	} //________________________________
+
+	/**
+	 * Assemble the SQL statement to delete the current row
+	 * in the table row uniquely identified by the list of fields 
+	 * in the "keyFields" parameter
+	 * 
+	 * @return - The resulting SQL statement
+	 */
+	protected String deleteStatement()
+	{	
+		StringBuilder sb = new StringBuilder ()
+			.append("delete from ").append(_tableName).append(" where ")
+		;
+		int iCurr = 0;
+		for(String sCurr : _keys)
+		{	if (iCurr++ > 0)
+				sb.append(" and ");
+			sb.append(sCurr).append(" = ?");
+		}		
+		return sb.toString();
+	} //________________________________
+
+	/**
+	 * Try to delete 'current row' from polled table
+	 * @return true if row deletion was successful - false otherwise
+	 */
+    protected boolean deleteCurrentRow() 
+    {
+        JdbcCleanConn dbConnection = null;
+        
+        try { dbConnection = getDbConn(); } 
+        catch (Exception e) 
+        {
+            _logger.error("Unable to get DB connection.", e);
+            throw new IllegalStateException("Unable to get DB connection.", e);
+        }
+        
+        try 
+        {
+            int iParm=1;
+            for (String sColName : _keys) 
+                _PSdeleteRow.setObject (iParm++,_currentRow.get(sColName));
+
+            try 
+            {
+                dbConnection.execUpdWait(_PSdeleteRow, 5);
+                dbConnection.commit();
+                return true;
+            } 
+            catch(Exception e) 
+            {
+                _logger.error("Delete row has failed.  Rolling back!!", e);
+            }
+            
+            try { dbConnection.rollback(); }
+            catch (Exception e) 
+            {
+                _logger.error("Unable to rollback delete row", e);
+            }
+        } 
+        catch (Exception e) { _logger.error("Unexpected exception.", e); } 
+        return false;
+    } //________________________________
+    
+    protected String getStatus(ROW_STATE p_oState) 
+    {
+        int iPos = p_oState.ordinal();
+        return _inProcessVals.substring(iPos, ++iPos);
+    } //________________________________
+
+    protected boolean changeStatusToWorking() 
+    {
+        return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+    } //________________________________
+    
+    protected boolean changeStatusToDone() 
+    {
+        return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+    } //________________________________
+    
+    protected boolean changeStatusToError() 
+    {
+        return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+    } //________________________________
+
+    protected boolean changeStatus(ROW_STATE fromState, ROW_STATE toState) 
+    {
+        JdbcCleanConn dbConnection = null;
+        
+        try { dbConnection = getDbConn(); } 
+        catch (Exception e) 
+        {
+            _logger.error("Unable to get DB connection.", e);
+            throw new IllegalStateException("Unable to get DB connection.", e);
+        }
+        
+        try 
+        {
+            int iParm=1;
+            for (String sColName : _keys) 
+            {   
+                Object oVal = _currentRow.get(sColName);
+                _PSsel4U.setObject (iParm  ,oVal);
+                // parameters are +1 in update statement
+                _PSupdate.setObject   (++iParm,oVal);
+            }
+
+            try 
+            {
+                ResultSet resultSet = dbConnection.execQueryWait(_PSsel4U, 5);
+                
+                if (resultSet.next()) 
+                {
+                    String sOldStatus = resultSet.getString(1).substring(0, 1);
+                 
+                    if (sOldStatus.equalsIgnoreCase(getStatus(fromState))) 
+                    {
+                        _PSupdate.setString(1, getStatus(toState));
+                        dbConnection.execUpdWait(_PSupdate, 5);
+                        dbConnection.commit();
+
+                        if(_logger.isDebugEnabled()) 
+                            _logger.debug("Successfully changed row state from " + fromState + " to " + toState + ".");
+                        
+                        return true;
+                    } 
+                    else 
+                    {
+                        _logger.warn("Cannot change row state from " + fromState + " to " + toState + ".  Row not in state " + fromState);
+                        return false;
+                    }
+                }
+                _logger.error("Row status change to " + toState + " has failed.  Rolling back!!");
+            } 
+            catch(Exception e) 
+            {
+                _logger.error("Row status change to " + toState + " has failed.  Rolling back!!", e);
+            }
+            
+            try { dbConnection.rollback(); }
+            catch (Exception e) 
+            {
+                _logger.error("Unable to rollback row status change to " + fromState.name(), e);
+            }
+        } 
+        catch (Exception e) { _logger.error("Unexpected exception.", e); } 
+
+        return false;
+    } //________________________________
+
+/**
+ * Default gateway action for SQL table rows
+ * <p/>It will just drop the result set contents into a Message
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+    private static class PackageRowContents
+    {
+    	public Message process (Object obj) throws Exception
+    	{
+    		if (! (obj instanceof List))
+    			throw new Exception ("Object must be instance of List");
+
+    		Message message = MessageFactory.getInstance().getMessage();
+    		message.getProperties().setProperty(ListenerTagNames.SQL_ROW_DATA_TAG, obj);    		
+    		return message;
+    	}    	
+    } //____________________________________________________
+
+    protected final static Logger _logger = Logger.getLogger(SqlTableGatewayListener.class);
+
+    protected ConfigTree 		_config;
+    protected GatewayListenerController _controller;
+    protected long 				_sleepBetweenPolls;   //  milliseconds
+
+    protected String			_serviceCategory, _serviceName;
+    protected String			_targetServiceCategory ,_targetServiceName;
+    protected EPR				_myEpr		,_targetEpr;
+
+    protected String			_composerName;
+    protected Class 			_composerClass;
+    protected Object			_composer;
+    protected Method			_processMethod;
+    
+    protected Courier			_courier;
+    
+    protected String			_driver		,_url		,_user	,_password;
+    protected String			_tableName	,_selectFields, _keyFields;
+    protected String			_where		,_orderBy;
+    protected String			_inProcessField			,_inProcessVals;
+    protected boolean			_deleteAfterOK;
+
+    protected String[]			_columns	,_keys;
+    protected PreparedStatement _PSscan		,_PSsel4U	,_PSupdate	,_PSdeleteRow;
+    protected JdbcCleanConn		_dbConn;
+    protected Map<String,Object>_currentRow;
+    
+    public static enum ROW_STATE {Pending ,Working ,Error ,Done };
+    public static final String DEFAULT_IN_PROCESS_STATES = "PWED";
+} //____________________________________________________________________________




More information about the jboss-svn-commits mailing list