[jboss-svn-commits] JBL Code SVN: r6239 - in labs/jbossesb/workspace/tfennelly/product/core: common/src/org/jboss/soa/esb/helpers/persist listeners/src/org/jboss/soa/esb/actions listeners/src/org/jboss/soa/esb/listeners listeners/tests/src/org/jboss/soa/esb/util
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Sep 15 11:42:01 EDT 2006
Author: tfennelly
Date: 2006-09-15 11:41:52 -0400 (Fri, 15 Sep 2006)
New Revision: 6239
Modified:
labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
Log:
cleaning up the SQLTable poller stuff.
Modified: labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/common/src/org/jboss/soa/esb/helpers/persist/JdbcCleanConn.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -47,15 +47,17 @@
}
public void commit() throws Exception {
- if (null != m_conn)
+ if (null != m_conn) {
m_conn.commit();
+ }
}
public void rollback() throws Exception {
- if (null != m_conn)
+ if (null != m_conn) {
m_conn.rollback();
+ }
}
-
+
public void release() {
if (null != m_conn) {
try {
@@ -79,23 +81,29 @@
public PreparedStatement prepareStatement(String p_sSt, int p_i1, int p_i2)
throws Exception {
- if (null == m_conn)
- connect();
+ if (null == m_conn) {
+ connect();
+ }
PreparedStatement PS = m_conn.prepareStatement(p_sSt, p_i1, p_i2);
m_olPrepSt.add(PS);
return PS;
} // __________________________________
public PreparedStatement prepareStatement(String p_sSt) throws Exception {
- if (null == m_conn)
+ if (null == m_conn) {
connect();
+ }
+
PreparedStatement PS = m_conn.prepareStatement(p_sSt);
m_olPrepSt.add(PS);
return PS;
} // __________________________________
- public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry)
- throws Exception {
+ public ResultSet execQueryWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
+ if (null == m_conn) {
+ connect();
+ }
+
Exception eRet = null;
int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
for (int i1 = 0; i1 < iQtry; i1++) {
@@ -113,8 +121,11 @@
throw eRet;
} // __________________________________
- public void execUpdWait(PreparedStatement p_PS, int p_iQtry)
- throws Exception {
+ public void execUpdWait(PreparedStatement p_PS, int p_iQtry) throws Exception {
+ if (null == m_conn) {
+ connect();
+ }
+
Exception eRet = null;
int iQtry = (p_iQtry < 1) ? 1 : (p_iQtry < 50) ? p_iQtry : 50;
for (int i1 = 0; i1 < iQtry; i1++) {
@@ -134,6 +145,10 @@
} // __________________________________
private void connect() throws Exception {
+ if(m_conn != null) {
+ return;
+ }
+
Exception eRet = null;
for (int i1 = 0; i1 < 5; i1++) {
try {
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
/**
* <p/>
@@ -36,7 +37,9 @@
*/
public abstract class AbstractFileAction implements ActionProcessor {
- public static class Params {
+ public static class Params implements Serializable {
+ private static final long serialVersionUID = 1L;
+
public boolean bPostDelete;
public File oInpF, oWrkF, oErrF, oDoneF;
@@ -84,13 +87,6 @@
throw new ActionProcessingException("Failed to process file message: " + fileParams, e);
}
- // Delete or rename the file...
- if (fileParams.bPostDelete) {
- fileParams.oWrkF.delete();
- } else {
- AbstractFileAction.renameToDone(fileParams);
- }
-
return message;
}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -2,163 +2,48 @@
import java.util.*;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import javax.sql.DataSource;
-
import org.apache.log4j.Logger;
import org.jboss.soa.esb.helpers.KeyValuePair;
-import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
-import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
-import org.jboss.soa.esb.listeners.SqlTablePoller.ROW_STATE;
+import org.jboss.soa.esb.listeners.SqlTablePoller.SQLPollResult;
-public abstract class AbstractSqlRowAction implements ActionProcessor
-{
+/**
+ * Abstract SQL Row action.
+ * <p/>
+ * Convienience class for processing messages from the {@link org.jboss.soa.esb.listeners.SqlTablePoller} listener.
+ * Implementing classes receive a single row resultset through their implementation of the {@link #processResultset(SQLPollResult)}
+ * method.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractSqlRowAction implements ActionProcessor {
protected Logger logger;
- protected JdbcCleanConn m_oConn;
- protected AbstractSqlRowAction(String actionName, List<KeyValuePair>properties) throws Exception
- {
-
- DataSource oDS = new SimpleDataSource(
- KeyValuePair.getValue(SimpleDataSource.DRIVER, properties),
- KeyValuePair.getValue(SimpleDataSource.URL, properties),
- KeyValuePair.getValue(SimpleDataSource.USER, properties),
- KeyValuePair.getValue(SimpleDataSource.PASSWORD, properties));
- m_oConn = new JdbcCleanConn(oDS);
+ protected AbstractSqlRowAction(String actionName,
+ List<KeyValuePair> properties) throws Exception {
+
logger = Logger.getLogger(getClass());
- }
-
- public Object process(Object message) throws ActionProcessingException {
- if(!(message instanceof Params)) {
- throw new ActionProcessingException("Message object for processing by " + getClass().getName()
- + " should have been preprocessed and 'normalised' to a " + Params.class.getName() + " instance.");
+ }
+
+ public final Object process(Object message) throws ActionProcessingException {
+ if (!(message instanceof SQLPollResult)) {
+ throw new ActionProcessingException(
+ "Message object for processing by "
+ + getClass().getName()
+ + " should have been preprocessed and supplied as a "
+ + SQLPollResult.class.getName() + " instance.");
}
-
- Params sqlParams = (Params)message;
- PreparedStatement m_PSsel4U ,m_PSupd;
-
- try {
- m_PSsel4U = m_oConn.prepareStatement(sqlParams.getSel4Upd());
- m_PSupd = m_oConn.prepareStatement(sqlParams.getUpdStmt());
- int iParm=1;
- for (String sColName : sqlParams.getKeys())
- {
- Object oVal = sqlParams.getColumnValue(sColName);
- m_PSsel4U.setObject (iParm ,oVal);
- // parameters are +1 in update statement
- // autoincrement leaves things ready for next SQL parameter
- m_PSupd.setObject (++iParm,oVal);
- }
- } catch(Exception e) {
- throw new ActionProcessingException("Exception processing SQL action.", e);
- }
-
- // will only continue if it can change status to "Working"
- try
- {
- if (! changeStatus(sqlParams, m_PSsel4U, m_PSupd, ROW_STATE.Pending,ROW_STATE.Working))
- { logger.warn("Unable to change status to Working");
- m_oConn.rollback();
- cleanup();
- return message;
- }
- m_oConn.commit();
- }
- catch(Exception e)
- { logger.error("Unable to change status to Working",e);
- cleanup();
- return message;
- }
- try
- {
- processSQL(sqlParams, m_PSsel4U, m_PSupd);
- changeStatus(sqlParams, m_PSsel4U, m_PSupd, ROW_STATE.Working,ROW_STATE.Done);
- m_oConn.commit();
- }
- catch (Exception e)
- {
- try
- { changeStatus(sqlParams, m_PSsel4U, m_PSupd, ROW_STATE.Working,ROW_STATE.Error);
- m_oConn.commit();
- }
- catch (Exception eErr)
- {
- logger.error("Unable to change status to ERROR - Really weird - shouldn't happen");
- }
- }
- finally { cleanup(); }
-
- return message;
- } //________________________________
-
+ return processResultset((SQLPollResult)message);
+ } // ________________________________
+
/**
- * Process the SQL message.
- * @param sqlParams SQL Pararmeters.
- * @param ssel4U Select for
- * @param supd
+ * Process the SQL poll resultset.
+ *
+ * @param pollResult SQL Poller resultset.
+ * @return Processing result.
*/
- protected abstract void processSQL(Params sqlParams, PreparedStatement ssel4U, PreparedStatement supd) throws SQLException;
+ protected abstract Object processResultset(SQLPollResult pollResult) throws ActionProcessingException;
- private boolean changeStatus (Params sqlParams, PreparedStatement ssel4U, PreparedStatement supd, ROW_STATE pFrom, ROW_STATE pTo) throws Exception
- { ResultSet RS = m_oConn.execQueryWait(ssel4U,5);
- if (! RS.next())
- return false;
- if (null!=pFrom)
- { String sOldStatus = RS.getString(1).substring(0,1);
- if (!sOldStatus.equalsIgnoreCase(sqlParams.getStatus(pFrom)))
- { m_oConn.rollback();
- return false;
- }
- }
- supd.setString(1, sqlParams.getStatus(pTo));
- m_oConn.execUpdWait(supd,5);
- m_oConn.commit();
-
- return true;
- } //______________________________
-
- private void cleanup()
- {
- if (null!=m_oConn)
- { try { m_oConn.rollback(); }
- catch(Exception e) { /* OK just continue */ }
- m_oConn.release();
- }
- } //________________________________
-
- public static class Params
- {
- public String sUpdStates;
- public String[] saCols ,saKeys;
- public String sSel4Upd,sUpdate;
- public Map<String,Object> omVals;
- public String toString() { return omVals.toString(); }
-
- protected String getSel4Upd()
- { return sSel4Upd;
- } //________________________________
-
- protected String getUpdStmt()
- { return sUpdate;
- } //________________________________
-
- protected String getStatus(ROW_STATE p_oState)
- { int iPos = p_oState.ordinal();
- return sUpdStates.substring(iPos,++iPos);
- } //________________________________
-
- protected Object getColumnValue(String p_sKey)
- { return omVals.get(p_sKey);
- } //________________________________
-
- protected String[] getKeys()
- { return saKeys;
- } //________________________________
- }
-
-} //____________________________________________________________________________
+}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -1,12 +1,12 @@
package org.jboss.soa.esb.actions;
import java.io.Serializable;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
import java.text.*;
import java.util.List;
+import java.util.Vector;
import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.listeners.SqlTablePoller.SQLPollResult;
/**
* Use this class to tune your XML configurations
@@ -20,18 +20,23 @@
*/
public class DummySqlRowAction extends AbstractSqlRowAction
{
+
+ public static List<SQLPollResult> params = new Vector<SQLPollResult>();
- protected DummySqlRowAction(String actionName, List<KeyValuePair> properties) throws Exception {
+ public DummySqlRowAction(String actionName, List<KeyValuePair> properties) throws Exception {
super(actionName, properties);
}
/* (non-Javadoc)
- * @see org.jboss.soa.esb.actions.AbstractSqlRowAction#processSQL(org.jboss.soa.esb.actions.AbstractSqlRowAction.Params, java.sql.PreparedStatement, java.sql.PreparedStatement)
+ * @see org.jboss.soa.esb.actions.AbstractSqlRowAction#processSQL()
*/
@Override
- protected void processSQL(Params sqlParams, PreparedStatement ssel4U, PreparedStatement supd) throws SQLException {
- logger.info("processObject was called with <<"
- + sqlParams.toString()+">>");
+ protected Object processResultset(SQLPollResult pollResult) {
+ logger.info("processResultset was called with <<"
+ + pollResult.toString()+">>");
+ params.add(pollResult);
+
+ return pollResult;
}
private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -48,7 +48,7 @@
protected ThreadGroup m_oThrGrp = null;
- protected Logger m_oLogger;
+ protected Logger logger;
protected GpListener m_oDad;
@@ -60,7 +60,7 @@
protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
- m_oLogger = Logger.getLogger(this.getClass());
+ logger = Logger.getLogger(this.getClass());
m_oDad = p_oDad;
listenerConfig = p_oParms.cloneObj();
m_oActionDefinitionFactory = actionDefinitionFactory;
@@ -88,9 +88,9 @@
while (m_oDad.continueLooping()) {
Object[] processList = receive();
- for (Object oCurr : processList) {
+ for (Object message : processList) {
if (m_iQthr >= m_iMaxThr) {
- m_oLogger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+ logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
try {
Thread.sleep(m_iSleepForThreads);
} catch (InterruptedException e) {
@@ -99,8 +99,8 @@
break;
}
- // Spawn a thread and push the message object through the pipeline...
- ActionProcessingPipeline runner = new ActionProcessingPipeline(oCurr);
+ // Spawn a thread and push the message message through the pipeline...
+ ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
new Thread(runner).start();
}
}
@@ -115,6 +115,20 @@
* @return An array of Objects received on the channel.
*/
protected abstract Object[] receive();
+
+ /**
+ * Called on the listener implementation when pipeline processing error has occured.
+ * @param initialMessage The message reference that was initialy supplied to the pipeline.
+ * @param processor The processor raised the error.
+ * @param error The error.
+ */
+ protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
+
+ /**
+ * Called on the listener implementation when pipeline processing of a message is complete.
+ * @param initialMessage The message reference that was initialy supplied to the pipeline.
+ */
+ protected abstract void processingComplete(Object initialMessage);
/**
* Close the listener implemenation.
@@ -140,7 +154,7 @@
/**
* Action Processing Pipeline.
* <p/>
- * Runs the actions in a listeners "actions" config on a message payload object received
+ * Runs the actions in a listeners "actions" config on a message payload message received
* by the listener implementation.
* <p/>
* TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes. Needs to be sorted out as an
@@ -151,14 +165,14 @@
*/
private class ActionProcessingPipeline implements Runnable {
- private Object object;
+ private Object initialMessage;
/**
* Private constructor.
- * @param initialObject The inital processing target object.
+ * @param initialMessage The inital processing target message.
*/
- private ActionProcessingPipeline(Object initialObject) {
- this.object = initialObject;
+ private ActionProcessingPipeline(Object initialMessage) {
+ this.initialMessage = initialMessage;
}
/* (non-Javadoc)
@@ -166,12 +180,15 @@
*/
public void run() {
String currentAction = null;
+ ActionProcessor currentProcessor = null;
// Increment the active thread count for the listener on starting...
incThreads();
try {
- // Run the object through each ActionProcessor...
+ Object message = initialMessage;
+
+ // Run the message through each ActionProcessor...
for(String action : m_oActions) {
ActionDefinition actionDefinition;
@@ -182,25 +199,28 @@
}
// The processing result of each action feeds into the processing of the next action...
- ActionProcessor processor = actionDefinition.getProcessor();
+ currentProcessor = actionDefinition.getProcessor();
try {
- object = processor.process(object);
+ message = currentProcessor.process(message);
} catch (Exception e) {
- GpListener.notifyError(listenerConfig, e, processor.getErrorNotification(object));
+ GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
throw e;
}
- if(object == null && action != m_oActions[m_oActions.length - 1]) {
- m_oLogger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+ if(message == null && action != m_oActions[m_oActions.length - 1]) {
+ logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
break;
}
// Notify on all processors. May want to do this differently in the future i.e. more selectively ...
- GpListener.notifyOK(listenerConfig, processor.getOkNotification(object));
+ GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
}
} catch(Throwable thrown) {
- m_oLogger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] thre an exception.", thrown);
+ processingError(initialMessage, currentProcessor, thrown);
+ logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] thre an exception.", thrown);
}
+ processingComplete(initialMessage);
+
// Decrement the active thread count for the listener on completion...
decThreads();
}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -77,16 +77,18 @@
try {
Thread.sleep(m_iPollMillis);
} catch (InterruptedException e) {
- m_oLogger.error("Unexpected thread interupt exception. Not terminating blocking receive!!", e);
+ logger.error("Unexpected thread interupt exception. Not terminating blocking receive!!", e);
}
continue;
} else {
+ Object[] objForProcessing = new Object[olPending.size()];
+
// Preprocess all the message objects.
// TODO: I really think this is no longer required or a good idea!!
for(int i = 0; i < olPending.size(); i++) {
- olPending.set(i, preProcess(olPending.get(i)));
+ objForProcessing[i] = preProcess(olPending.get(i));
}
- return olPending.toArray();
+ return objForProcessing;
}
}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -30,6 +30,8 @@
import org.jboss.soa.esb.util.*;
import org.jboss.soa.esb.actions.AbstractFileAction;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.actions.AbstractFileAction.Params;
import org.jboss.soa.esb.helpers.*;
public class DirectoryPoller extends AbstractPoller
@@ -180,5 +182,27 @@
@Override
protected void close() {
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ AbstractFileAction.Params fileParams = (Params) initialMessage;
+
+ // Delete or rename the file...
+ if (fileParams.bPostDelete) {
+ fileParams.oWrkF.delete();
+ } else {
+ AbstractFileAction.renameToDone(fileParams);
+ }
+ }
} //____________________________________________________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -26,6 +26,7 @@
import javax.jms.*;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
import org.jboss.soa.esb.helpers.*;
public class JmsQueueListener extends AbstractListener {
@@ -109,17 +110,17 @@
try {
jmsMessage = jmsMessageReceiver.receive(m_oDad.millisToWait());
} catch (JMSException oJ) {
- m_oLogger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
+ logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
for (int i1 = 0; i1 < 3; i1++)
try {
checkMyParms();
} // try to reconnect to the queue
catch (Exception e) {
- m_oLogger.error("Reconnecting to Queue", e);
+ logger.error("Reconnecting to Queue", e);
try {
Thread.sleep(m_iSleepForThreads);
} catch (InterruptedException e1) { // Just return
- m_oLogger.error("Unexpected thread interupt exception.", e);
+ logger.error("Unexpected thread interupt exception.", e);
return null;
}
}
@@ -153,4 +154,18 @@
}
}
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ }
}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -23,14 +23,27 @@
package org.jboss.soa.esb.listeners;
-import java.util.*;
-import java.sql.*;
-import javax.sql.*;
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.helpers.persist.*;
-import org.jboss.soa.esb.actions.*;
-import org.jboss.soa.esb.util.*;
+import javax.sql.DataSource;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.util.Util;
+
/**
* SqlTablePoller class
*
@@ -161,7 +174,7 @@
try { checkMyParms(); }
catch (Exception e)
{
- m_oLogger.error("checkMyParms() FAILED",e);
+ logger.error("checkMyParms() FAILED",e);
throw e;
}
} //__________________________________
@@ -174,7 +187,7 @@
protected void checkMyParms() throws Exception
{
- checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER ,null);
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER ,null);
checkAndStoreAtt(listenerConfig,SimpleDataSource.URL ,null);
checkAndStoreAtt(listenerConfig,SimpleDataSource.USER ,"");
checkAndStoreAtt(listenerConfig,SimpleDataSource.PASSWORD ,"");
@@ -224,7 +237,8 @@
return p_o;
} //________________________________
- @Override
+ @SuppressWarnings("unchecked")
+ @Override
protected List<Object> pollForCandidates()
{
String sSel4U = selectForUpdStatement();
@@ -238,27 +252,30 @@
PreparedStatement PS = oConn.prepareStatement(sScan);
ResultSet RS = oConn.execQueryWait(PS,1);
- while (RS.next())
- { Map<String,Object> oColVals = new HashMap<String,Object>();
+ while (RS.next()) {
+ SQLPollResult rowParams = new SQLPollResult(sSel4U, sUpdStmt);
int iCurr = 0;
- for (String sColName : m_saCols)
- oColVals.put(sColName,RS.getObject(++iCurr));
+ for (String sColName : m_saCols) {
+ rowParams.put(sColName,RS.getObject(++iCurr));
+ }
+
// Set up the parameter object for the SqlRowAction
- AbstractSqlRowAction.Params oActionP = new AbstractSqlRowAction.Params();
- oActionP.omVals = oColVals;
- oActionP.sUpdStates = m_sUpdStates;
- oActionP.saCols = m_saCols;
- oActionP.saKeys = m_saKeys;
- oActionP.sSel4Upd = sSel4U;
- oActionP.sUpdate = sUpdStmt;
+ rowParams.sUpdStates = m_sUpdStates;
+ rowParams.saCols = m_saCols;
+ rowParams.saKeys = m_saKeys;
+ rowParams.sSel4Upd = sSel4U;
+ rowParams.sUpdate = sUpdStmt;
+
+ // Mark the row as "working"...
+ rowParams.changeStatusToWorking();
- oResults.add(oActionP);
+ oResults.add(rowParams);
}
}
catch (Exception e)
{
- m_oLogger.warn("Some triggers might not have been returned",e);
+ logger.warn("Some triggers might not have been returned",e);
}
finally
{
@@ -266,6 +283,7 @@
oConn.release();
}
+ logger.info("Returning " + oResults.size() + " rows.");
return oResults;
} //________________________________
@@ -369,4 +387,120 @@
protected void close() {
}
-} //____________________________________________________________________________
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ // Mark the row as "error"...
+ ((SQLPollResult)initialMessage).changeStatusToError();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ // Mark the row as "working"...
+ ((SQLPollResult)initialMessage).changeStatusToDone();
+ }
+
+ public class SQLPollResult extends LinkedHashMap implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String sUpdStates;
+
+ private String[] saCols, saKeys;
+
+ private String sSel4Upd, sUpdate;
+
+ private SQLPollResult(String sSel4Upd, String sUpdate) throws Exception {
+ this.sSel4Upd = sSel4Upd;
+ this.sUpdate = sUpdate;
+ }
+
+ private String getStatus(ROW_STATE p_oState) {
+ int iPos = p_oState.ordinal();
+ return sUpdStates.substring(iPos, ++iPos);
+ }
+
+ private boolean changeStatusToWorking() {
+ return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+ }
+
+ private boolean changeStatusToDone() {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+ }
+
+ private boolean changeStatusToError() {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+ }
+
+ private boolean changeStatus(ROW_STATE fromState, ROW_STATE toState) {
+ JdbcCleanConn dbConnection = null;
+
+ try {
+ // This is expensive at the moment but will be OK once we get proper connection pooling enabled!
+ dbConnection = newDbConn();
+ } catch (Exception e) {
+ logger.error("Unable to get DB connection.", e);
+ throw new IllegalStateException("Unable to get DB connection.", e);
+ }
+
+ try {
+ PreparedStatement m_PSsel4U;
+ PreparedStatement m_PSupd;
+
+ m_PSsel4U = dbConnection.prepareStatement(sSel4Upd);
+ m_PSupd = dbConnection.prepareStatement(sUpdate);
+
+ int iParm=1;
+ for (String sColName : saKeys) {
+ Object oVal = get(sColName);
+ m_PSsel4U.setObject (iParm ,oVal);
+ // parameters are +1 in update statement
+ // autoincrement leaves things ready for next SQL parameter
+ m_PSupd.setObject (++iParm,oVal);
+ }
+
+ try {
+ ResultSet resultSet = dbConnection.execQueryWait(m_PSsel4U, 5);
+
+ if (resultSet.next()) {
+ String sOldStatus = resultSet.getString(1).substring(0, 1);
+
+ if (sOldStatus.equalsIgnoreCase(getStatus(fromState))) {
+ m_PSupd.setString(1, getStatus(toState));
+ dbConnection.execUpdWait(m_PSupd, 5);
+ dbConnection.commit();
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("Successfully changed row state from " + fromState + " to " + toState + ".");
+ }
+
+ return true;
+ } else {
+ logger.warn("Cannot change row state from " + fromState + " to " + toState + ". Row not in state " + fromState);
+ return false;
+ }
+ }
+ logger.error("Row status change to " + toState + " has failed. Rolling back!!");
+ } catch(Exception e) {
+ logger.error("Row status change to " + toState + " has failed. Rolling back!!", e);
+ }
+
+ try {
+ dbConnection.rollback();
+ } catch (Exception e) {
+ logger.error("Unable to rollback row status change to " + fromState.name(), e);
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected exception.", e);
+ } finally {
+ dbConnection.release();
+ }
+
+ return false;
+ }
+ }
+}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java 2006-09-15 14:20:33 UTC (rev 6238)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java 2006-09-15 15:41:52 UTC (rev 6239)
@@ -6,6 +6,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
import org.jboss.soa.esb.helpers.DomElement;
import org.jboss.soa.esb.listeners.AbstractPoller;
import org.jboss.soa.esb.listeners.GpListener;
@@ -73,4 +74,18 @@
@Override
protected void close() {
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ }
}
More information about the jboss-svn-commits
mailing list