[jboss-svn-commits] JBL Code SVN: r7493 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Nov 8 18:08:17 EST 2006
Author: estebanschifman
Date: 2006-11-08 18:08:14 -0500 (Wed, 08 Nov 2006)
New Revision: 7493
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
Log:
new SqlTableGatewayListener and minor change to FileGatewayListener
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java 2006-11-08 22:57:47 UTC (rev 7492)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java 2006-11-08 23:08:14 UTC (rev 7493)
@@ -75,7 +75,7 @@
// only sleep in between
if (bSleep)
try { Thread.sleep(_sleepBetweenPolls); }
- catch (InterruptedException e) { return; }
+ catch (InterruptedException e) { break; }
else
bSleep = true;
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2006-11-08 22:57:47 UTC (rev 7492)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2006-11-08 23:08:14 UTC (rev 7493)
@@ -0,0 +1,583 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+public class SqlTableGatewayListener implements Runnable
+{
+
+ public SqlTableGatewayListener(GatewayListenerController commandListener, ConfigTree config)
+ throws Exception
+ {
+ _config = config;
+ _controller = commandListener;
+ _sleepBetweenPolls = 10000; // milliseconds
+ checkMyParms();
+ } // __________________________________
+
+ public void run()
+ {
+ if (null!=_serviceName)
+ try { _controller.register(_config,_myEpr); }
+ catch (RegistryException e1)
+ { _logger.warn("unable to register service",e1); }
+
+ boolean bSleep = false;
+ while (_controller.continueLooping())
+ {
+ // only sleep in between - not the first time
+ if (bSleep)
+ try { Thread.sleep(_sleepBetweenPolls); }
+ catch (InterruptedException e) { break; }
+ else
+ bSleep = true;
+
+ for (Map<String,Object> row : pollForCandidates())
+ {
+ _currentRow = row;
+ // Try to mark as 'in process' - if unsuccessful, somebody else got it first
+ if (! changeStatusToWorking())
+ continue;
+
+ Throwable thrown = null;
+ String text = null;
+ try
+ {
+ Object obj = _processMethod.invoke(_composer,new Object[] {_currentRow} );
+ if (null==obj)
+ {
+ _logger.warn("Action class method <"+_processMethod.getName()+"> returned a null object");
+ continue;
+ }
+ _courier.deliver((org.jboss.soa.esb.message.Message)obj);
+ }
+
+ catch (InvocationTargetException e)
+ {
+ thrown = e;
+ text = "Problems invoking method <"+_processMethod.getName()+">";
+
+ }
+ catch (IllegalAccessException e)
+ {
+ thrown = e;
+ text = "Problems invoking method <"+_processMethod.getName()+">";
+ }
+ catch (ClassCastException e)
+ {
+ thrown = e;
+ text = "Action class method <"+_processMethod.getName()+"> returned a non Message object";
+ }
+ catch (CourierException e)
+ {
+ thrown = e;
+ text = "Courier <"+_courier.getClass().getName()+".deliver(Message) FAILED";
+ }
+
+ if (null==thrown)
+ {
+ if (_deleteAfterOK)
+ deleteCurrentRow();
+ else
+ changeStatusToDone();
+ }
+ else
+ {
+ thrown.printStackTrace();
+ _logger.error(text,thrown);
+ changeStatusToError();
+ }
+ }
+ }
+
+ if (null!=_serviceName)
+ try { _controller.unRegister(_serviceCategory, _serviceName,_myEpr); }
+ catch (RegistryException e1){ _logger.warn("unable to unRegister service",e1); }
+
+ if (null!=_dbConn)
+ _dbConn.release();
+ } // ________________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws Exception
+ {
+ // Third arg is null - Exception will be thrown if attribute is not found
+ _targetServiceCategory = _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ _targetEpr = _controller.getEprByName(_targetServiceName);
+ if (null==_targetEpr)
+ throw new ConfigurationException("EPR <"+_targetServiceName+"> not found in registry");
+ _courier = CourierFactory.getCourier(_targetEpr);
+
+ // Polling interval
+ String sAux = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ if (! Util.isNullString(sAux))
+ try { _sleepBetweenPolls = 1000 * Long.parseLong(sAux); }
+ catch (NumberFormatException e)
+ { _logger.warn("Invalid poll latency - keeping default of "+(_sleepBetweenPolls/1000)); }
+
+ resolveComposerClass();
+
+ _driver = _controller.obtainAtt(_config, JDBCEpr.DRIVER_TAG , null);
+ _url = _controller.obtainAtt(_config, JDBCEpr.URL_TAG , null);
+ _user = _controller.obtainAtt(_config, JDBCEpr.USERNAME_TAG , null);
+ _password = _controller.obtainAtt(_config, JDBCEpr.PASSWORD_TAG , "");
+
+ _tableName = _controller.obtainAtt(_config, ListenerTagNames.SQL_TABLE_NAME_TAG,null);
+ _selectFields = _controller.obtainAtt(_config, ListenerTagNames.SQL_SELECT_FIELDS_TAG,null);
+ _keyFields = _controller.obtainAtt(_config, ListenerTagNames.SQL_KEY_FIELDS_TAG,null);
+ _inProcessField = _controller.obtainAtt(_config, ListenerTagNames.SQL_IN_PROCESS_FIELD_TAG,null);
+
+ _where = _controller.obtainAtt(_config, ListenerTagNames.SQL_WHERE_CONDITION_TAG,"");
+ _orderBy = _controller.obtainAtt(_config, ListenerTagNames.SQL_ORDER_BY_TAG,"");
+ _inProcessVals = _controller.obtainAtt(_config, ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG,DEFAULT_IN_PROCESS_STATES);
+
+ if (_inProcessVals.length()<4)
+ throw new Exception("Parameter <"+ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG
+ +"> must be at least 4 characters long (PWED)");
+
+ _columns = _selectFields.split(",");
+ if (_columns.length < 1)
+ throw new Exception("Empty list of select fields");
+
+ Set<String>colSet = new HashSet<String>(Arrays.asList(_columns));
+ _keys = _keyFields.split(",");
+ if (_keys.length < 1)
+ throw new Exception("Empty list of keyFields");
+ for(String currKey : _keys)
+ {
+ if (colSet.contains(currKey))
+ continue;
+ else
+ { StringBuilder sb = new StringBuilder()
+ .append("All key field names in the <").append(ListenerTagNames.SQL_KEY_FIELDS_TAG)
+ .append("> attribute must be in the ").append(ListenerTagNames.SQL_SELECT_FIELDS_TAG)
+ .append("list - '").append(currKey).append("' is not there");
+ ;
+ throw new ConfigurationException(sb.toString());
+ }
+ }
+ prepareStatements();
+ } //________________________________
+
+ protected void prepareStatements() throws ConfigurationException
+ {
+ try
+ {
+ _PSscan = getDbConn().prepareStatement(scanStatement());
+ _PSsel4U = getDbConn().prepareStatement(selectForUpdStatement());
+ _PSupdate = getDbConn().prepareStatement(updateStatement());
+ _PSdeleteRow= getDbConn().prepareStatement(deleteStatement());
+ return;
+ }
+ catch (Exception e)
+ {throw new ConfigurationException(e); }
+ } //________________________________
+
+
+ protected void resolveComposerClass() throws Exception
+ {
+ // Look for first "action" element - only first one will be used
+ String tagName = ListenerTagNames.ACTION_ELEMENT_TAG;
+ ConfigTree actionElement = _config.getFirstChild(tagName);
+ String sProcessMethod = null;
+ if (null!=actionElement)
+ { // class attribute
+ _composerName = _controller.obtainAtt(actionElement,ListenerTagNames.ACTION_CLASS_TAG,null);
+ _composerClass = Class.forName(_composerName);
+ Constructor oConst = _composerClass.getConstructor(new Class[] {ConfigTree.class});
+ _composer= oConst.newInstance(_config);
+ tagName = ListenerTagNames.PROCESS_METHOD_TAG;
+ sProcessMethod = _controller.obtainAtt(_config,tagName,tagName);
+ }
+ else
+ {
+ _composerName = PackageRowContents.class.getName();
+ _composerClass= PackageRowContents.class;
+ _composer = new PackageRowContents();
+ sProcessMethod = "process";
+ }
+
+ _processMethod = _composerClass.getMethod(sProcessMethod,new Class[] {Object.class});
+ } //________________________________
+
+ protected List<Map<String,Object>> pollForCandidates()
+ {
+ JdbcCleanConn oConn = null;
+ List<Map<String,Object>> oResults = new ArrayList<Map<String,Object>>();
+ try
+ {
+ oConn = getDbConn();
+ String sScan = scanStatement();
+
+ PreparedStatement PS = oConn.prepareStatement(sScan);
+ ResultSet RS = oConn.execQueryWait(PS,1);
+ while (RS.next())
+ {
+ Map<String,Object> row = new HashMap<String,Object>();
+ int iCurr = 0;
+
+ for (String sColName : _columns)
+ row.put(sColName,RS.getObject(++iCurr));
+
+ oResults.add(row);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Some triggers might not have been returned",e);
+ }
+ _logger.info("Returning " + oResults.size() + " rows.");
+ return oResults;
+ } //________________________________
+
+ /**
+ * Obtain a new database connection with parameter info
+ * @return A new connection
+ * @throws Exception - if problems are encountered
+ */
+ protected JdbcCleanConn getDbConn() throws Exception
+ {
+ if (null==_dbConn)
+ {
+ DataSource oDS = new SimpleDataSource (_driver,_url,_user, _password);
+ _dbConn = new JdbcCleanConn(oDS);
+ }
+ return _dbConn;
+ } //________________________________
+
+ /**
+ * Assemble the SQL statement to scan (poll) the table
+ * @return - The resulting SQL statement
+ */
+ protected String scanStatement()
+ {
+ StringBuilder sb = new StringBuilder ()
+ .append("select ").append(_selectFields)
+ .append(" from ") .append(_tableName);
+
+ boolean bWhere = ! Util.isNullString(_where);
+ if (bWhere)
+ sb.append(" where ").append(_where);
+ sb.append((bWhere) ? " and " : " where ");
+
+ String sLike = _inProcessVals.substring(0,1).toUpperCase();
+ sb.append(" upper(").append(_inProcessField)
+ .append(") like '").append(sLike).append("%'");
+
+ if (! Util.isNullString(_orderBy))
+ sb.append(" order by ").append(_orderBy);
+ return sb.toString();
+ } //________________________________
+
+ /**
+ * Assemble the SQL statement to update the field
+ * in the "inProcessField" parameter
+ *
+ * in the table row uniquely identified by the list of fields
+ * in the "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String updateStatement()
+ {
+ StringBuilder sb = new StringBuilder ()
+ .append("update ").append(_tableName)
+ .append(" set ") .append(_inProcessField).append(" = ? where ")
+ ;
+ int iCurr = 0;
+ for(String sCurr : _keys)
+ { if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
+ return sb.toString();
+ } //________________________________
+
+ /**
+ * Assemble the SQL "select for update" statement
+ * for the "inProcessField" parameter
+ *
+ * in the table row uniquely identified by the list of fields
+ * in the "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String selectForUpdStatement()
+ {
+ StringBuilder sb = new StringBuilder ()
+ .append("select ").append(_inProcessField)
+ .append(" from ") .append(_tableName)
+ .append(" where ")
+ ;
+ int iCurr = 0;
+ for(String sCurr : _keys)
+ { if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
+ return sb.append(" for update").toString();
+ } //________________________________
+
+ /**
+ * Assemble the SQL statement to delete the current row
+ * in the table row uniquely identified by the list of fields
+ * in the "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String deleteStatement()
+ {
+ StringBuilder sb = new StringBuilder ()
+ .append("delete from ").append(_tableName).append(" where ")
+ ;
+ int iCurr = 0;
+ for(String sCurr : _keys)
+ { if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
+ return sb.toString();
+ } //________________________________
+
+ /**
+ * Try to delete 'current row' from polled table
+ * @return true if row deletion was successful - false otherwise
+ */
+ protected boolean deleteCurrentRow()
+ {
+ JdbcCleanConn dbConnection = null;
+
+ try { dbConnection = getDbConn(); }
+ catch (Exception e)
+ {
+ _logger.error("Unable to get DB connection.", e);
+ throw new IllegalStateException("Unable to get DB connection.", e);
+ }
+
+ try
+ {
+ int iParm=1;
+ for (String sColName : _keys)
+ _PSdeleteRow.setObject (iParm++,_currentRow.get(sColName));
+
+ try
+ {
+ dbConnection.execUpdWait(_PSdeleteRow, 5);
+ dbConnection.commit();
+ return true;
+ }
+ catch(Exception e)
+ {
+ _logger.error("Delete row has failed. Rolling back!!", e);
+ }
+
+ try { dbConnection.rollback(); }
+ catch (Exception e)
+ {
+ _logger.error("Unable to rollback delete row", e);
+ }
+ }
+ catch (Exception e) { _logger.error("Unexpected exception.", e); }
+ return false;
+ } //________________________________
+
+ protected String getStatus(ROW_STATE p_oState)
+ {
+ int iPos = p_oState.ordinal();
+ return _inProcessVals.substring(iPos, ++iPos);
+ } //________________________________
+
+ protected boolean changeStatusToWorking()
+ {
+ return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+ } //________________________________
+
+ protected boolean changeStatusToDone()
+ {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+ } //________________________________
+
+ protected boolean changeStatusToError()
+ {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+ } //________________________________
+
+ protected boolean changeStatus(ROW_STATE fromState, ROW_STATE toState)
+ {
+ JdbcCleanConn dbConnection = null;
+
+ try { dbConnection = getDbConn(); }
+ catch (Exception e)
+ {
+ _logger.error("Unable to get DB connection.", e);
+ throw new IllegalStateException("Unable to get DB connection.", e);
+ }
+
+ try
+ {
+ int iParm=1;
+ for (String sColName : _keys)
+ {
+ Object oVal = _currentRow.get(sColName);
+ _PSsel4U.setObject (iParm ,oVal);
+ // parameters are +1 in update statement
+ _PSupdate.setObject (++iParm,oVal);
+ }
+
+ try
+ {
+ ResultSet resultSet = dbConnection.execQueryWait(_PSsel4U, 5);
+
+ if (resultSet.next())
+ {
+ String sOldStatus = resultSet.getString(1).substring(0, 1);
+
+ if (sOldStatus.equalsIgnoreCase(getStatus(fromState)))
+ {
+ _PSupdate.setString(1, getStatus(toState));
+ dbConnection.execUpdWait(_PSupdate, 5);
+ dbConnection.commit();
+
+ if(_logger.isDebugEnabled())
+ _logger.debug("Successfully changed row state from " + fromState + " to " + toState + ".");
+
+ return true;
+ }
+ else
+ {
+ _logger.warn("Cannot change row state from " + fromState + " to " + toState + ". Row not in state " + fromState);
+ return false;
+ }
+ }
+ _logger.error("Row status change to " + toState + " has failed. Rolling back!!");
+ }
+ catch(Exception e)
+ {
+ _logger.error("Row status change to " + toState + " has failed. Rolling back!!", e);
+ }
+
+ try { dbConnection.rollback(); }
+ catch (Exception e)
+ {
+ _logger.error("Unable to rollback row status change to " + fromState.name(), e);
+ }
+ }
+ catch (Exception e) { _logger.error("Unexpected exception.", e); }
+
+ return false;
+ } //________________________________
+
+/**
+ * Default gateway action for SQL table rows
+ * <p/>It will just drop the result set contents into a Message
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+ private static class PackageRowContents
+ {
+ public Message process (Object obj) throws Exception
+ {
+ if (! (obj instanceof List))
+ throw new Exception ("Object must be instance of List");
+
+ Message message = MessageFactory.getInstance().getMessage();
+ message.getProperties().setProperty(ListenerTagNames.SQL_ROW_DATA_TAG, obj);
+ return message;
+ }
+ } //____________________________________________________
+
+ protected final static Logger _logger = Logger.getLogger(SqlTableGatewayListener.class);
+
+ protected ConfigTree _config;
+ protected GatewayListenerController _controller;
+ protected long _sleepBetweenPolls; // milliseconds
+
+ protected String _serviceCategory, _serviceName;
+ protected String _targetServiceCategory ,_targetServiceName;
+ protected EPR _myEpr ,_targetEpr;
+
+ protected String _composerName;
+ protected Class _composerClass;
+ protected Object _composer;
+ protected Method _processMethod;
+
+ protected Courier _courier;
+
+ protected String _driver ,_url ,_user ,_password;
+ protected String _tableName ,_selectFields, _keyFields;
+ protected String _where ,_orderBy;
+ protected String _inProcessField ,_inProcessVals;
+ protected boolean _deleteAfterOK;
+
+ protected String[] _columns ,_keys;
+ protected PreparedStatement _PSscan ,_PSsel4U ,_PSupdate ,_PSdeleteRow;
+ protected JdbcCleanConn _dbConn;
+ protected Map<String,Object>_currentRow;
+
+ public static enum ROW_STATE {Pending ,Working ,Error ,Done };
+ public static final String DEFAULT_IN_PROCESS_STATES = "PWED";
+} //____________________________________________________________________________
More information about the jboss-svn-commits
mailing list