[jboss-svn-commits] JBL Code SVN: r6239 - in labs/jbossesb/workspace/tfennelly/product/core: common/src/org/jboss/soa/esb/helpers/persist listeners/src/org/jboss/soa/esb/actions listeners/src/org/jboss/soa/esb/listeners listeners/tests/src/org/jboss/soa/esb/util

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Sep 15 11:42:01 EDT 2006


Author: tfennelly
Date: 2006-09-15 11:41:52 -0400 (Fri, 15 Sep 2006)
New Revision: 6239

Modified:
   labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
Log:
cleaning up the SQLTable poller stuff.

Modified: labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -47,15 +47,17 @@
 	}
 
 	public void commit() throws Exception {
-		if (null != m_conn)
+		if (null != m_conn) {
 			m_conn.commit();
+        }
 	}
 
 	public void rollback() throws Exception {
-		if (null != m_conn)
+		if (null != m_conn) {
 			m_conn.rollback();
+        }
 	}
-
+    
 	public void release() {
 		if (null != m_conn) {
 			try {
@@ -79,23 +81,29 @@
 
 	public PreparedStatement prepareStatement(String p_sSt, int p_i1, int p_i2)
 			throws Exception {
-		if (null == m_conn)
-			connect();
+        if (null == m_conn) {
+            connect();
+        }
 		PreparedStatement PS = m_conn.prepareStatement(p_sSt, p_i1, p_i2);
 		m_olPrepSt.add(PS);
 		return PS;
 	} // __________________________________
 
 	public PreparedStatement prepareStatement(String p_sSt) throws Exception {
-		if (null == m_conn)
+		if (null == m_conn) {
 			connect();
+        }
+        
 		PreparedStatement PS = m_conn.prepareStatement(p_sSt);
 		m_olPrepSt.add(PS);
 		return PS;
 	} // __________________________________
 
-	public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry)
-			throws Exception {
+	public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
+        if (null == m_conn) {
+            connect();
+        }
+        
 		Exception eRet = null;
 		int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
 		for (int i1 = 0; i1 < iQtry; i1++) {
@@ -113,8 +121,11 @@
 		throw eRet;
 	} // __________________________________
 
-	public void execUpdWait(PreparedStatement p_PS, int p_iQtry)
-			throws Exception {
+	public void execUpdWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
+        if (null == m_conn) {
+            connect();
+        }
+        
 		Exception eRet = null;
 		int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
 		for (int i1 = 0; i1 < iQtry; i1++) {
@@ -134,6 +145,10 @@
 	} // __________________________________
 
 	private void connect() throws Exception {
+        if(m_conn != null) {
+            return;
+        }
+        
 		Exception eRet = null;
 		for (int i1 = 0; i1 < 5; i1++) {
 			try {

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -24,6 +24,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * <p/>
@@ -36,7 +37,9 @@
  */
 public abstract class AbstractFileAction implements ActionProcessor {
     
-    public static class Params {
+    public static class Params implements Serializable {
+        private static final long serialVersionUID = 1L;
+
         public boolean bPostDelete;
 
         public File oInpF, oWrkF, oErrF, oDoneF;
@@ -84,13 +87,6 @@
             throw new ActionProcessingException("Failed to process file message: " + fileParams, e);
         }
         
-        // Delete or rename the file...
-        if (fileParams.bPostDelete) {
-            fileParams.oWrkF.delete();
-        } else {
-            AbstractFileAction.renameToDone(fileParams);
-        }
-        
         return message;
     }
 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -2,163 +2,48 @@
 
 import java.util.*;
 
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import javax.sql.DataSource;
-
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.helpers.KeyValuePair;
-import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
-import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
-import org.jboss.soa.esb.listeners.SqlTablePoller.ROW_STATE;
+import org.jboss.soa.esb.listeners.SqlTablePoller.SQLPollResult;
 
-public abstract class AbstractSqlRowAction implements ActionProcessor
-{
+/**
+ * Abstract SQL Row action.
+ * <p/>
+ * Convienience class for processing messages from the {@link org.jboss.soa.esb.listeners.SqlTablePoller} listener.
+ * Implementing classes receive a single row resultset through their implementation of the {@link #processResultset(SQLPollResult)}
+ * method.
+ * 
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractSqlRowAction implements ActionProcessor {
 
     protected Logger logger;
-	protected JdbcCleanConn	m_oConn;
 
-	protected AbstractSqlRowAction(String actionName, List<KeyValuePair>properties) throws Exception
-	{
-        
-		DataSource oDS	= new SimpleDataSource(
-                KeyValuePair.getValue(SimpleDataSource.DRIVER, properties),
-                KeyValuePair.getValue(SimpleDataSource.URL, properties),
-                KeyValuePair.getValue(SimpleDataSource.USER, properties),
-                KeyValuePair.getValue(SimpleDataSource.PASSWORD, properties));
-		m_oConn			= new JdbcCleanConn(oDS);
+    protected AbstractSqlRowAction(String actionName,
+            List<KeyValuePair> properties) throws Exception {
+
         logger = Logger.getLogger(getClass());
-	}
-    
-    public Object process(Object message) throws ActionProcessingException {
-        if(!(message instanceof Params)) {
-            throw new ActionProcessingException("Message object for processing by " + getClass().getName() 
-                    + " should have been preprocessed and 'normalised' to a " + Params.class.getName() + " instance.");
+    }
+
+    public final Object process(Object message) throws ActionProcessingException {
+        if (!(message instanceof SQLPollResult)) {
+            throw new ActionProcessingException(
+                    "Message object for processing by "
+                            + getClass().getName()
+                            + " should have been preprocessed and supplied as a "
+                            + SQLPollResult.class.getName() + " instance.");
         }
-        
-        Params sqlParams = (Params)message;
-        PreparedStatement m_PSsel4U   ,m_PSupd;
-        
-        try {
-            m_PSsel4U       = m_oConn.prepareStatement(sqlParams.getSel4Upd());
-            m_PSupd         = m_oConn.prepareStatement(sqlParams.getUpdStmt());
-            int iParm=1;
-            for (String sColName : sqlParams.getKeys())
-            {   
-                Object oVal = sqlParams.getColumnValue(sColName);
-                m_PSsel4U.setObject (iParm  ,oVal);
-                // parameters are +1 in update statement
-                // autoincrement leaves things ready for next SQL parameter
-                m_PSupd.setObject   (++iParm,oVal);
-            }
-        } catch(Exception e) {
-            throw new ActionProcessingException("Exception processing SQL action.", e);
-        }
-        
-  		// will only continue if it can change status to "Working"
-		try
-		{
-	  		if (! changeStatus(sqlParams, m_PSsel4U, m_PSupd, ROW_STATE.Pending,ROW_STATE.Working))
-	  		{	logger.warn("Unable to change status to Working");
-	  			m_oConn.rollback();
-	  			cleanup();
-	  			return message;
-	  		}
-	 		m_oConn.commit();			
-		}
-		catch(Exception e)
-		{	logger.error("Unable to change status to Working",e);
-			cleanup();
-			return message;
-		}
 
-		try 
-		{ 
-			processSQL(sqlParams, m_PSsel4U, m_PSupd);
-			changeStatus(sqlParams, m_PSsel4U, m_PSupd, ROW_STATE.Working,ROW_STATE.Done);
-			m_oConn.commit();
-		} 
-		catch (Exception e) 
-		{
-			try 
-			{	changeStatus(sqlParams, m_PSsel4U, m_PSupd, ROW_STATE.Working,ROW_STATE.Error); 
-				m_oConn.commit();
-			}
-			catch (Exception eErr)
-			{
-				logger.error("Unable to change status to ERROR - Really weird - shouldn't happen");
-			}
-		}
-		finally { cleanup(); }
-        
-        return message;
-	} //________________________________
-    
+        return processResultset((SQLPollResult)message);
+    } // ________________________________
+
     /**
-     * Process the SQL message.
-     * @param sqlParams SQL Pararmeters.
-     * @param ssel4U Select for 
-     * @param supd
+     * Process the SQL poll resultset.
+     * 
+     * @param pollResult SQL Poller resultset.
+     * @return Processing result.
      */
-    protected abstract void processSQL(Params sqlParams, PreparedStatement ssel4U, PreparedStatement supd) throws SQLException;
+    protected abstract Object processResultset(SQLPollResult pollResult) throws ActionProcessingException;
 
-    private boolean changeStatus (Params sqlParams, PreparedStatement ssel4U, PreparedStatement supd, ROW_STATE pFrom, ROW_STATE pTo) throws Exception
-    {   ResultSet RS = m_oConn.execQueryWait(ssel4U,5);
-        if (! RS.next())
-            return false;
-        if (null!=pFrom)
-        {   String sOldStatus = RS.getString(1).substring(0,1);
-            if (!sOldStatus.equalsIgnoreCase(sqlParams.getStatus(pFrom)))
-            {   m_oConn.rollback();
-                return false;
-            }
-        }
-        supd.setString(1, sqlParams.getStatus(pTo));
-        m_oConn.execUpdWait(supd,5);
-        m_oConn.commit();
-        
-        return true;
-    } //______________________________
-	
-	private void cleanup()
-	{	
-		if (null!=m_oConn)
-		{	try	{	m_oConn.rollback(); }
-			catch(Exception e)	{ /* OK just continue */ }
-			m_oConn.release();
-		}
-	} //________________________________
-
-    public static class Params
-    {
-        public String   sUpdStates;
-        public String[] saCols  ,saKeys;
-        public String   sSel4Upd,sUpdate;
-        public Map<String,Object> omVals;
-        public String toString() { return omVals.toString(); }
-
-        protected String getSel4Upd()
-        {   return sSel4Upd;
-        } //________________________________
-
-        protected String getUpdStmt()
-        {   return sUpdate;
-        } //________________________________
-
-        protected String getStatus(ROW_STATE p_oState)
-        {   int iPos = p_oState.ordinal();
-            return sUpdStates.substring(iPos,++iPos);
-        } //________________________________
-
-        protected Object getColumnValue(String p_sKey)
-        {   return omVals.get(p_sKey);
-        } //________________________________
-
-        protected String[] getKeys()
-        {   return saKeys;
-        } //________________________________
-    } 
-
-} //____________________________________________________________________________
+}

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -1,12 +1,12 @@
 package org.jboss.soa.esb.actions;
 
 import java.io.Serializable;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
 import java.text.*;
 import java.util.List;
+import java.util.Vector;
 
 import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.listeners.SqlTablePoller.SQLPollResult;
 
 /**
  * Use this class to tune your XML configurations
@@ -20,18 +20,23 @@
  */
 public class DummySqlRowAction extends AbstractSqlRowAction 
 {
+    
+    public static List<SQLPollResult> params = new Vector<SQLPollResult>();
 
-    protected DummySqlRowAction(String actionName, List<KeyValuePair> properties) throws Exception {
+    public DummySqlRowAction(String actionName, List<KeyValuePair> properties) throws Exception {
         super(actionName, properties);
     }
 
     /* (non-Javadoc)
-     * @see org.jboss.soa.esb.actions.AbstractSqlRowAction#processSQL(org.jboss.soa.esb.actions.AbstractSqlRowAction.Params, java.sql.PreparedStatement, java.sql.PreparedStatement)
+     * @see org.jboss.soa.esb.actions.AbstractSqlRowAction#processSQL()
      */
     @Override
-    protected void processSQL(Params sqlParams, PreparedStatement ssel4U, PreparedStatement supd) throws SQLException {
-        logger.info("processObject was called with <<"
-                + sqlParams.toString()+">>");
+    protected Object processResultset(SQLPollResult pollResult) {
+        logger.info("processResultset was called with <<"
+                + pollResult.toString()+">>");
+        params.add(pollResult);
+        
+        return pollResult;
     }
 	
 	private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -48,7 +48,7 @@
 
     protected ThreadGroup m_oThrGrp = null;
 
-    protected Logger m_oLogger;
+    protected Logger logger;
 
     protected GpListener m_oDad;
 
@@ -60,7 +60,7 @@
 
     protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
         
-        m_oLogger = Logger.getLogger(this.getClass());
+        logger = Logger.getLogger(this.getClass());
         m_oDad = p_oDad;
         listenerConfig = p_oParms.cloneObj();
         m_oActionDefinitionFactory = actionDefinitionFactory;
@@ -88,9 +88,9 @@
         while (m_oDad.continueLooping()) {
             Object[] processList = receive();
             
-            for (Object oCurr : processList) {
+            for (Object message : processList) {
                 if (m_iQthr >= m_iMaxThr) {
-                    m_oLogger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
                     try {
                         Thread.sleep(m_iSleepForThreads);
                     } catch (InterruptedException e) {
@@ -99,8 +99,8 @@
                     break;
                 }
 
-                // Spawn a thread and push the message object through the pipeline...
-                ActionProcessingPipeline runner = new ActionProcessingPipeline(oCurr);
+                // Spawn a thread and push the message message through the pipeline...
+                ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
                 new Thread(runner).start();
             }
         }
@@ -115,6 +115,20 @@
      * @return An array of Objects received on the channel.
      */
     protected abstract Object[] receive();
+
+    /**
+     * Called on the listener implementation when pipeline processing error has occured.
+     * @param initialMessage The message reference that was initialy supplied to the pipeline.
+     * @param processor The processor raised the error.
+     * @param error The error.
+     */
+    protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
+
+    /**
+     * Called on the listener implementation when pipeline processing of a message is complete.
+     * @param initialMessage The message reference that was initialy supplied to the pipeline.
+     */
+    protected abstract void processingComplete(Object initialMessage);
     
     /**
      * Close the listener implemenation.
@@ -140,7 +154,7 @@
     /**
      * Action Processing Pipeline.
      * <p/>
-     * Runs the actions in a listeners "actions" config on a message payload object received
+     * Runs the actions in a listeners "actions" config on a message payload message received
      * by the listener implementation.
      * <p/>
      * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes.  Needs to be sorted out as an
@@ -151,14 +165,14 @@
      */
     private class ActionProcessingPipeline implements Runnable {
         
-        private Object object;
+        private Object initialMessage;
              
         /**
          * Private constructor.
-         * @param initialObject The inital processing target object.
+         * @param initialMessage The inital processing target message.
          */
-        private ActionProcessingPipeline(Object initialObject) {
-            this.object = initialObject;
+        private ActionProcessingPipeline(Object initialMessage) {
+            this.initialMessage = initialMessage;
         }
 
         /* (non-Javadoc)
@@ -166,12 +180,15 @@
          */
         public void run() {
             String currentAction = null;
+            ActionProcessor currentProcessor = null;
             
             // Increment the active thread count for the listener on starting...
             incThreads();
             
             try {
-                // Run the object through each ActionProcessor...
+                Object message = initialMessage;
+
+                // Run the message through each ActionProcessor...
                 for(String action : m_oActions) {
                     ActionDefinition actionDefinition;
 
@@ -182,25 +199,28 @@
                     }
                     
                     // The processing result of each action feeds into the processing of the next action...
-                    ActionProcessor processor = actionDefinition.getProcessor();
+                    currentProcessor = actionDefinition.getProcessor();
                     try {
-                        object = processor.process(object);
+                        message = currentProcessor.process(message);
                     } catch (Exception e) {
-                        GpListener.notifyError(listenerConfig, e, processor.getErrorNotification(object));
+                        GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
                         throw e;
                     }
                     
-                    if(object == null && action != m_oActions[m_oActions.length - 1]) {
-                        m_oLogger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+                    if(message == null && action != m_oActions[m_oActions.length - 1]) {
+                        logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
                         break;
                     }
                     // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
-                    GpListener.notifyOK(listenerConfig, processor.getOkNotification(object));
+                    GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
                 }
             } catch(Throwable thrown) {
-                m_oLogger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] thre an exception.", thrown);
+                processingError(initialMessage, currentProcessor, thrown);
+                logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] thre an exception.", thrown);
             }
             
+            processingComplete(initialMessage);
+            
             // Decrement the active thread count for the listener on completion...
             decThreads();
         }

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -77,16 +77,18 @@
                 try {
                     Thread.sleep(m_iPollMillis);
                 } catch (InterruptedException e) {
-                    m_oLogger.error("Unexpected thread interupt exception.  Not terminating blocking receive!!", e);
+                    logger.error("Unexpected thread interupt exception.  Not terminating blocking receive!!", e);
                 }
                 continue;
             } else {
+                Object[] objForProcessing = new Object[olPending.size()];
+                
                 // Preprocess all the message objects.
                 // TODO: I really think this is no longer required or a good idea!!
                 for(int i = 0; i < olPending.size(); i++) {
-                    olPending.set(i, preProcess(olPending.get(i)));
+                    objForProcessing[i] = preProcess(olPending.get(i));
                 }
-                return olPending.toArray();
+                return objForProcessing;
             }
         }
         

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -30,6 +30,8 @@
 import org.jboss.soa.esb.util.*;
 import org.jboss.soa.esb.actions.AbstractFileAction;
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.actions.AbstractFileAction.Params;
 import org.jboss.soa.esb.helpers.*;
 
 public class DirectoryPoller extends AbstractPoller
@@ -180,5 +182,27 @@
     @Override
     protected void close() {
     }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+     */
+    @Override
+    protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+    }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+     */
+    @Override
+    protected void processingComplete(Object initialMessage) {
+        AbstractFileAction.Params fileParams = (Params) initialMessage;
+        
+        // Delete or rename the file...
+        if (fileParams.bPostDelete) {
+            fileParams.oWrkF.delete();
+        } else {
+            AbstractFileAction.renameToDone(fileParams);
+        }
+    }
     
 } //____________________________________________________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -26,6 +26,7 @@
 import javax.jms.*;
 
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
 import org.jboss.soa.esb.helpers.*;
 
 public class JmsQueueListener extends AbstractListener {
@@ -109,17 +110,17 @@
             try {
                 jmsMessage = jmsMessageReceiver.receive(m_oDad.millisToWait());
             } catch (JMSException oJ) {
-                m_oLogger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
+                logger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
                 for (int i1 = 0; i1 < 3; i1++)
                     try {
                         checkMyParms();
                     } // try to reconnect to the queue
                     catch (Exception e) {
-                        m_oLogger.error("Reconnecting to Queue", e);
+                        logger.error("Reconnecting to Queue", e);
                         try {
                             Thread.sleep(m_iSleepForThreads);
                         } catch (InterruptedException e1) { // Just return
-                            m_oLogger.error("Unexpected thread interupt exception.", e);
+                            logger.error("Unexpected thread interupt exception.", e);
                             return null;
                         }
                     }
@@ -153,4 +154,18 @@
             }
         }
     }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+     */
+    @Override
+    protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+    }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+     */
+    @Override
+    protected void processingComplete(Object initialMessage) {
+    }
 } 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -23,14 +23,27 @@
 
 package org.jboss.soa.esb.listeners;
 
-import java.util.*;
-import java.sql.*;
-import javax.sql.*;
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
 
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.helpers.persist.*;
-import org.jboss.soa.esb.actions.*;
-import org.jboss.soa.esb.util.*;
+import javax.sql.DataSource;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.util.Util;
+
 /**
  * SqlTablePoller class
  * 
@@ -161,7 +174,7 @@
 	try { checkMyParms(); }
 	catch (Exception e)
 	{
-		m_oLogger.error("checkMyParms() FAILED",e);
+		logger.error("checkMyParms() FAILED",e);
 		throw e;
 	}
   } //__________________________________
@@ -174,7 +187,7 @@
 
 	protected void checkMyParms() throws Exception
     { 
-	  checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER		,null);
+	  checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER	,null);
 	  checkAndStoreAtt(listenerConfig,SimpleDataSource.URL		,null);
 	  checkAndStoreAtt(listenerConfig,SimpleDataSource.USER		,"");
 	  checkAndStoreAtt(listenerConfig,SimpleDataSource.PASSWORD	,"");
@@ -224,7 +237,8 @@
 		return p_o;
 	} //________________________________
 
-	@Override
+	@SuppressWarnings("unchecked")
+    @Override
 	protected List<Object> pollForCandidates() 
 	{
 		String sSel4U	= selectForUpdStatement();
@@ -238,27 +252,30 @@
 
 			PreparedStatement PS = oConn.prepareStatement(sScan);
 			ResultSet RS = oConn.execQueryWait(PS,1);
-			while (RS.next())
-			{	Map<String,Object> oColVals = new HashMap<String,Object>();
+			while (RS.next()) {	
+                SQLPollResult rowParams = new SQLPollResult(sSel4U, sUpdStmt);
 				int iCurr = 0;
-				for (String sColName : m_saCols)
-					oColVals.put(sColName,RS.getObject(++iCurr));
 
+                for (String sColName : m_saCols) {
+                    rowParams.put(sColName,RS.getObject(++iCurr));
+                }
+
 				// Set up the parameter object for the SqlRowAction
-				AbstractSqlRowAction.Params oActionP = new AbstractSqlRowAction.Params();
-				oActionP.omVals		= oColVals;
-				oActionP.sUpdStates	= m_sUpdStates;
-				oActionP.saCols		= m_saCols;
-				oActionP.saKeys		= m_saKeys;
-				oActionP.sSel4Upd	= sSel4U;
-				oActionP.sUpdate	= sUpdStmt;
+				rowParams.sUpdStates	= m_sUpdStates;
+				rowParams.saCols		= m_saCols;
+				rowParams.saKeys		= m_saKeys;
+				rowParams.sSel4Upd	= sSel4U;
+				rowParams.sUpdate	= sUpdStmt;
+                
+                // Mark the row as "working"...
+                rowParams.changeStatusToWorking();
 				
-				oResults.add(oActionP);
+				oResults.add(rowParams);
 			}
 		}
 		catch (Exception e)
 		{
-			m_oLogger.warn("Some triggers might not have been returned",e);
+			logger.warn("Some triggers might not have been returned",e);
 		}
 		finally
 		{
@@ -266,6 +283,7 @@
 				oConn.release();
 		}
 		
+        logger.info("Returning " + oResults.size() + " rows.");
 		return oResults;
 	} //________________________________
 
@@ -369,4 +387,120 @@
     protected void close() {
     }
 
-} //____________________________________________________________________________
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+     */
+    @Override
+    protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+        // Mark the row as "error"...
+        ((SQLPollResult)initialMessage).changeStatusToError();
+    }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+     */
+    @Override
+    protected void processingComplete(Object initialMessage) {
+        // Mark the row as "working"...
+        ((SQLPollResult)initialMessage).changeStatusToDone();
+    }
+
+    public class SQLPollResult extends LinkedHashMap implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private String sUpdStates;
+        
+        private String[] saCols, saKeys;
+
+        private String sSel4Upd, sUpdate;
+
+        private SQLPollResult(String sSel4Upd, String sUpdate) throws Exception {
+            this.sSel4Upd = sSel4Upd;
+            this.sUpdate = sUpdate;    
+        }
+
+        private String getStatus(ROW_STATE p_oState) {
+            int iPos = p_oState.ordinal();
+            return sUpdStates.substring(iPos, ++iPos);
+        }
+        
+        private boolean changeStatusToWorking() {
+            return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+        }
+        
+        private boolean changeStatusToDone() {
+            return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+        }
+        
+        private boolean changeStatusToError() {
+            return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+        }
+
+        private boolean changeStatus(ROW_STATE fromState, ROW_STATE toState) {
+            JdbcCleanConn dbConnection = null;
+            
+            try {
+                // This is expensive at the moment but will be OK once we get proper connection pooling enabled!
+                dbConnection = newDbConn();
+            } catch (Exception e) {
+                logger.error("Unable to get DB connection.", e);
+                throw new IllegalStateException("Unable to get DB connection.", e);
+            }
+            
+            try {
+                PreparedStatement m_PSsel4U;
+                PreparedStatement m_PSupd;
+
+                m_PSsel4U       = dbConnection.prepareStatement(sSel4Upd);
+                m_PSupd         = dbConnection.prepareStatement(sUpdate);
+       
+                int iParm=1;
+                for (String sColName : saKeys) {   
+                    Object oVal = get(sColName);
+                    m_PSsel4U.setObject (iParm  ,oVal);
+                    // parameters are +1 in update statement
+                    // autoincrement leaves things ready for next SQL parameter
+                    m_PSupd.setObject   (++iParm,oVal);
+                }
+
+                try {
+                    ResultSet resultSet = dbConnection.execQueryWait(m_PSsel4U, 5);
+                    
+                    if (resultSet.next()) {
+                        String sOldStatus = resultSet.getString(1).substring(0, 1);
+                     
+                        if (sOldStatus.equalsIgnoreCase(getStatus(fromState))) {
+                            m_PSupd.setString(1, getStatus(toState));
+                            dbConnection.execUpdWait(m_PSupd, 5);
+                            dbConnection.commit();
+
+                            if(logger.isDebugEnabled()) {
+                                logger.debug("Successfully changed row state from " + fromState + " to " + toState + ".");
+                            }
+                            
+                            return true;
+                        } else {
+                            logger.warn("Cannot change row state from " + fromState + " to " + toState + ".  Row not in state " + fromState);
+                            return false;
+                        }
+                    }
+                    logger.error("Row status change to " + toState + " has failed.  Rolling back!!");
+                } catch(Exception e) {
+                    logger.error("Row status change to " + toState + " has failed.  Rolling back!!", e);
+                }
+                
+                try {
+                    dbConnection.rollback();
+                } catch (Exception e) {
+                    logger.error("Unable to rollback row status change to " + fromState.name(), e);
+                }
+            } catch (Exception e) {
+                logger.error("Unexpected exception.", e);
+            } finally {
+                dbConnection.release();
+            }
+
+            return false;
+        }
+    }
+}

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java	2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java	2006-09-15 15:41:52 UTC (rev 6239)
@@ -6,6 +6,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
 import org.jboss.soa.esb.helpers.DomElement;
 import org.jboss.soa.esb.listeners.AbstractPoller;
 import org.jboss.soa.esb.listeners.GpListener;
@@ -73,4 +74,18 @@
     @Override
     protected void close() {
     }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+     */
+    @Override
+    protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+    }
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+     */
+    @Override
+    protected void processingComplete(Object initialMessage) {
+    }
 }




More information about the jboss-svn-commits mailing list