[jboss-svn-commits] JBL Code SVN: r6228 - in labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb: actions command listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Sep 14 11:21:06 EDT 2006
Author: tfennelly
Date: 2006-09-14 11:21:01 -0400 (Thu, 14 Sep 2006)
New Revision: 6228
Added:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
Removed:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
Modified:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
ActionProcessor pipeline model changes
Deleted: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -1,43 +0,0 @@
-package org.jboss.soa.esb.actions;
-
-import java.io.Serializable;
-import java.util.Observable;
-import org.apache.log4j.Logger;
-
-import org.jboss.soa.esb.helpers.DomElement;
-import org.jboss.soa.esb.listeners.GpListener;
-
-public abstract class AbstractAction extends Observable
- implements Runnable
-{
- public abstract void processCurrentObject() throws Exception;
- public abstract Serializable getOkNotification();
- public abstract Serializable getErrorNotification();
-
- protected DomElement m_oParms;
- protected Object m_oCurr;
- protected Logger m_oLogger = Logger.getLogger(this.getClass());
-
- protected AbstractAction(DomElement p_oP, Object p_oCurr)
- { m_oParms = p_oP;
- m_oCurr = p_oCurr;
- } //________________________________
-
- public void run()
- {
- try
- {
- processCurrentObject();
- GpListener.notifyOK(m_oParms,getOkNotification());
- }
- catch (Exception e)
- {
- GpListener.notifyError(m_oParms,e,getErrorNotification());
- }
- finally
- { setChanged();
- notifyObservers(new Integer(-1));
- }
- } //________________________________
-
-} //____________________________________________________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -2,7 +2,7 @@
import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.jboss.soa.esb.helpers.DomElement;
@@ -28,7 +28,7 @@
private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
private String name;
- private BlockingQueue<String> queue = new SynchronousQueue<String>();
+ private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
public void open(DomElement config) throws CommandQueueException {
if(config == null) {
@@ -50,6 +50,13 @@
*/
public void addCommand(String command) {
queue.add(command);
+ while(!queue.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
public String receiveCommand(long timeout) throws CommandQueueException {
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -0,0 +1,209 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.util.*;
+
+import org.apache.log4j.*;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.*;
+
+/**
+ * Base abstract message listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractListener implements Runnable {
+
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,DomElement)
+ protected int m_iSleepForThreads = 3000; // default sleep if no threads available
+ protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+
+ protected int m_iQthr = 0, m_iMaxThr;
+
+ protected ThreadGroup m_oThrGrp = null;
+
+ protected Logger m_oLogger;
+
+ protected GpListener m_oDad;
+
+ protected DomElement listenerConfig;
+
+ protected String[] m_oActions;
+
+ protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+ protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+
+ m_oLogger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ listenerConfig = p_oParms.cloneObj();
+ m_oActionDefinitionFactory = actionDefinitionFactory;
+ m_oThrGrp = new ThreadGroup(listenerConfig.getName());
+
+ String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
+ m_oActions = sAtt.split(",");
+
+ if(m_oActions.length == 0) {
+ throw new ConfigurationException("Listener 'actions' list must be specified.");
+ }
+
+ sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ } // __________________________________
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+ public void run() {
+ while (m_oDad.continueLooping()) {
+ Object[] processList = receive();
+
+ for (Object oCurr : processList) {
+ if (m_iQthr >= m_iMaxThr) {
+ m_oLogger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+
+ // Spawn a thread and push the message object through the pipeline...
+ ActionProcessingPipeline runner = new ActionProcessingPipeline(oCurr);
+ new Thread(runner).start();
+ }
+ }
+
+ close();
+ }
+
+ /**
+ * Receive message from underlying channel implementation.
+ * <p/>
+ * Implementations must perform a blocking receive.
+ * @return An array of Objects received on the channel.
+ */
+ protected abstract Object[] receive();
+
+ /**
+ * Close the listener implemenation.
+ * <p/>
+ * Allows the listener to perform relevant close/cleanup tasks.
+ */
+ protected abstract void close();
+
+ /**
+ * Increment the active thread count.
+ */
+ private void incThreads() {
+ m_iQthr++;
+ }
+
+ /**
+ * Decrement the active thread count.
+ */
+ private void decThreads() {
+ m_iQthr--;
+ }
+
+ /**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs the actions in a listeners "actions" config on a message payload object received
+ * by the listener implementation.
+ * <p/>
+ * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes. Needs to be sorted out as an
+ * overall cleanup of these classes. Lots of duplicate code etc.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ private class ActionProcessingPipeline implements Runnable {
+
+ private Object object;
+
+ /**
+ * Private constructor.
+ * @param initialObject The inital processing target object.
+ */
+ private ActionProcessingPipeline(Object initialObject) {
+ this.object = initialObject;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ String currentAction = null;
+
+ // Increment the active thread count for the listener on starting...
+ incThreads();
+
+ try {
+ // Run the object through each ActionProcessor...
+ for(String action : m_oActions) {
+ ActionDefinition actionDefinition;
+
+ currentAction = action.trim();
+ actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+ if(actionDefinition == null) {
+ throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
+ }
+
+ // The processing result of each action feeds into the processing of the next action...
+ ActionProcessor processor = actionDefinition.getProcessor();
+ try {
+ object = processor.process(object);
+ } catch (Exception e) {
+ GpListener.notifyError(listenerConfig, e, processor.getErrorNotification(object));
+ throw e;
+ }
+
+ if(object == null && action != m_oActions[m_oActions.length - 1]) {
+ m_oLogger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+ break;
+ }
+ // Notify on all processors. May want to do this differently in the future i.e. more selectively ...
+ GpListener.notifyOK(listenerConfig, processor.getOkNotification(object));
+ }
+ } catch(Throwable thrown) {
+ m_oLogger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] thre an exception.", thrown);
+ }
+
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
+ }
+ }
+
+} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -24,205 +24,86 @@
import java.util.*;
-import org.apache.log4j.*;
-
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinition;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
import org.jboss.soa.esb.helpers.*;
-public abstract class AbstractPoller implements Runnable {
- protected abstract List<Object> pollForCandidates();
+/**
+ * Abstract Polling Listener.
+ * <p/>
+ * Polling listeners are listener implementations that periodically poll for message objects
+ * that require processing. This type of listener implementation is required where the underlying
+ * message channel doesn't support a blocking receive operation.
+ *
+ * @author Esteban
+ */
+public abstract class AbstractPoller extends AbstractListener {
- protected abstract Object preProcess(Object p_o) throws Exception;
-
// You can override these values at constructor time of your
// derived class after calling super(GpListener,DomElement)
protected int m_iMinPollMillis = 3000 // minimum polling interval
, m_iDfltPollMillis = 20000 // default polling interval
- , m_iSleepForThreads = 3000 // default sleep if no threads available
- , m_iUpperThreadLimit = 10 // just in case - override if you wish
;
public static final String PARM_POLL_LTCY = "pollLatencySecs";
- protected int m_iQthr = 0, m_iMaxThr;
-
protected int m_iPollMillis;
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger m_oLogger;
-
- protected GpListener m_oDad;
-
- protected DomElement m_oParms;
-
- protected String[] m_oActions;
-
- protected ActionDefinitionFactory m_oActionDefinitionFactory;
-
- protected AbstractPoller(GpListener p_oDad, DomElement p_oParms,
- ActionDefinitionFactory actionDefinitionFactory) throws Exception {
- m_oLogger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- m_oParms = p_oParms.cloneObj();
- m_oActionDefinitionFactory = actionDefinitionFactory;
- checkParms();
- } // __________________________________
-
/**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws Exception -
- * if actionClass not specified or not in classpath or invalid
- * int values for maxThreads or pollLatencySecs
+ * Construct an abstract polling listener.
+ * @param commandListener The command listener.
+ * @param listenerConfig The configuration for this polling listener.
+ * @param actionDefinitionFactory The action definition factory for the bus.
+ * @throws Exception
*/
- protected void checkParms() throws Exception {
- String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
- m_oActions = sAtt.split(",");
-
- if(m_oActions.length == 0) {
- throw new ConfigurationException("Listener 'actions' list must be specified.");
- }
+ protected AbstractPoller(GpListener commandListener, DomElement listenerConfig, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(commandListener, listenerConfig, actionDefinitionFactory);
- sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-
- sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
- m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer
- .parseInt(sAtt);
- if (m_iPollMillis < m_iMinPollMillis)
+ String sAtt = listenerConfig.getAttr(PARM_POLL_LTCY);
+ m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer.parseInt(sAtt);
+ if (m_iPollMillis < m_iMinPollMillis) {
m_iPollMillis = m_iMinPollMillis;
- } // ________________________________
-
- /**
- * Increment the active thread count.
- */
- private void incThreads() {
- m_iQthr++;
+ }
}
/**
- * Decrement the active thread count.
+ * Polling listener receive implementation.
+ * @return An array of objects polled from the concrete Poller implementation.
*/
- private void decThreads() {
- m_iQthr--;
- }
-
- /**
- * Implement run method for this Runnable <p/> Will continue to run until
- * controlling class (ref in m_oDad) indicates no more looping allowed for
- * all child classes <p/> This condition will not prevent child processes to
- * finish normally
- */
- public void run() {
- m_oThrGrp = new ThreadGroup(m_oParms.getName());
+ protected Object[] receive() {
while (m_oDad.continueLooping()) {
List<Object> olPending = pollForCandidates();
- if (olPending.size() < 1) {
+
+ if (olPending == null || olPending.isEmpty()) {
try {
Thread.sleep(m_iPollMillis);
} catch (InterruptedException e) {
- return;
+ m_oLogger.error("Unexpected thread interupt exception. Not terminating blocking receive!!", e);
}
continue;
- }
-
- for (Object oCurr : olPending) {
- if (m_iQthr >= m_iMaxThr) {
- m_oLogger.info("Waiting for available threads...(max="
- + m_iMaxThr + ")");
- try {
- Thread.sleep(m_iSleepForThreads);
- } catch (InterruptedException e) {
- return;
- }
- break;
+ } else {
+ // Preprocess all the message objects.
+ // TODO: I really think this is no longer required or a good idea!!
+ for(int i = 0; i < olPending.size(); i++) {
+ olPending.set(i, preProcess(olPending.get(i)));
}
-
- // give the derived class an opportunity to do something
- // before processing current object.
- Object oProcess = null;
- try {
- // REVIEW: Is this "preProcess" step still required now that we've got chained actions??
- if (null == (oProcess = preProcess(oCurr))) {
- continue;
- }
- } catch (Exception ePre) {
- m_oLogger.error("preProcess(Object) FAILED", ePre);
- continue;
- }
-
- ActionProcessingPipeline runner = new ActionProcessingPipeline(oProcess);
- new Thread(runner).start();
+ return olPending.toArray();
}
}
- } // __________________________________
+
+ return null;
+ }
-
/**
- * Action Processing Pipeline.
- * <p/>
- * Runs the actions in a listeners "actions" config on a message payload object received
- * by the listener implementation.
- * <p/>
- * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes. Needs to be sorted out as an
- * overall cleanup of these classes. Lots of duplicate code etc.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
+ * Poll for message objects.
+ * @return A list of message objects, or an empty list if there are no message objects.
*/
- private class ActionProcessingPipeline implements Runnable {
-
- private Object object;
-
- /**
- * Private constructor.
- * @param initialObject The inital processing target object.
- */
- private ActionProcessingPipeline(Object initialObject) {
- this.object = initialObject;
- }
+ protected abstract List<Object> pollForCandidates();
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
- String currentAction = null;
-
- // Increment the active thread count for the listener on starting...
- incThreads();
-
- try {
- // Run the object through each ActionProcessor...
- for(String action : m_oActions) {
- ActionDefinition actionDefinition;
-
- currentAction = action.trim();
- actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
- if(actionDefinition == null) {
- throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
- }
-
- // The processing result of each action feeds into the processing of the next action...
- ActionProcessor processor = actionDefinition.getProcessor();
- object = processor.process(object);
-
- if(object == null && action != m_oActions[m_oActions.length - 1]) {
- m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
- break;
- }
- }
- } catch(Throwable thrown) {
- m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] thre an exception.", thrown);
- }
-
- // Decrement the active thread count for the listener on completion...
- decThreads();
- }
- }
-
-} // ____________________________________________________________________________
+ /**
+ * Preprocess the message object before returning for pipeline processing.
+ * @param message Message object for preprocessing.
+ * @return The preprocessed message object, or the supplied message unmodified.
+ */
+ protected abstract Object preProcess(Object message);
+ // TODO: Is this "preprocessing" step needed now that we have processing pipelines on listeners???
+}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -64,7 +64,7 @@
* <p/>[2] target file name in case actionClass finishes successfuly
*/
@Override
- public Object preProcess(Object p_o) throws Exception
+ public Object preProcess(Object p_o)
{
if (!(p_o instanceof File))
return null;
@@ -91,32 +91,32 @@
return Arrays.asList((Object[])oaF);
} //________________________________
- protected void checkMyParms() throws Exception
+ private void checkMyParms() throws Exception
{
// INPUT directory and suffix (used for FileFilter)
- String sInpDir = GpListener.obtainAtt(m_oParms,FILE_INPUT_DIR,null);
+ String sInpDir = GpListener.obtainAtt(listenerConfig,FILE_INPUT_DIR,null);
m_oInpDir = new File(new URI(sInpDir));
seeIfOkToWorkOnDir(m_oInpDir);
- m_sInpSfx = GpListener.obtainAtt(m_oParms,FILE_INPUT_SFX,null);
+ m_sInpSfx = GpListener.obtainAtt(listenerConfig,FILE_INPUT_SFX,null);
m_sInpSfx = m_sInpSfx.trim();
if (m_sInpSfx.length()<1)
throw new Exception ("Invalid "+FILE_INPUT_SFX+" attribute");
m_oFFilt = new FileEndsWith(m_sInpSfx);
// WORK suffix (will rename in input directory)
- m_sWrkSfx = GpListener.obtainAtt(m_oParms,FILE_WORK_SFX,".esbWork").trim();
+ m_sWrkSfx = GpListener.obtainAtt(listenerConfig,FILE_WORK_SFX,".esbWork").trim();
if (m_sWrkSfx.length()<1)
throw new Exception ("Invalid "+FILE_WORK_SFX+" attribute");
if (m_sInpSfx.equals(m_sWrkSfx))
throw new Exception("Work suffix must differ from input suffix <"+m_sWrkSfx+">");
// ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
- String sErrDir = GpListener.obtainAtt(m_oParms,FILE_ERROR_DIR,sInpDir);
+ String sErrDir = GpListener.obtainAtt(listenerConfig,FILE_ERROR_DIR,sInpDir);
m_oErrorDir = new File(new URI(sErrDir));
seeIfOkToWorkOnDir(m_oErrorDir);
- m_sErrSfx = GpListener.obtainAtt(m_oParms,FILE_ERROR_SFX,".esbError").trim();
+ m_sErrSfx = GpListener.obtainAtt(listenerConfig,FILE_ERROR_SFX,".esbError").trim();
if (m_sErrSfx.length()<1)
throw new Exception ("Invalid "+FILE_ERROR_SFX+" attribute");
if (m_oErrorDir.equals(m_oInpDir) && m_sInpSfx.equals(m_sErrSfx))
@@ -124,16 +124,16 @@
// Do users wish to delete files that were processed OK ?
- String sPostDel = GpListener.obtainAtt(m_oParms,FILE_POST_DEL,"false").trim();
+ String sPostDel = GpListener.obtainAtt(listenerConfig,FILE_POST_DEL,"false").trim();
m_bPostDel = Boolean.parseBoolean(sPostDel);
if (m_bPostDel)
return;
// POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
- String sPostDir = GpListener.obtainAtt(m_oParms,FILE_POST_DIR,sInpDir);
+ String sPostDir = GpListener.obtainAtt(listenerConfig,FILE_POST_DIR,sInpDir);
m_oPostDir = new File(new URI(sPostDir));
seeIfOkToWorkOnDir(m_oPostDir);
- m_sPostSfx = GpListener.obtainAtt(m_oParms,FILE_POST_SFX,".esbDone").trim();
+ m_sPostSfx = GpListener.obtainAtt(listenerConfig,FILE_POST_SFX,".esbDone").trim();
if (m_oPostDir.equals(m_oInpDir))
{ if (m_sPostSfx.length()<1)
throw new Exception ("Invalid "+FILE_POST_SFX+" attribute");
@@ -173,4 +173,12 @@
} //______________________________
} //____________________________________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ }
+
} //____________________________________________________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -34,7 +34,6 @@
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.AbstractAction;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
import org.jboss.soa.esb.command.CommandQueue;
import org.jboss.soa.esb.command.CommandQueueException;
@@ -544,10 +543,10 @@
* supplied by invoker
*/
public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
- throws Exception {
+ throws ConfigurationException {
String sVal = p_oP.getAttr(p_sAtt);
if ((null == sVal) && (null == p_sDefault))
- throw new Exception("Missing or invalid <" + p_sAtt + "> attribute");
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
return (null != sVal) ? sVal : p_sDefault;
} // ________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -22,25 +22,13 @@
package org.jboss.soa.esb.listeners;
-import java.util.Arrays;
-
-import org.apache.log4j.*;
-
import javax.naming.*;
import javax.jms.*;
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinition;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
import org.jboss.soa.esb.helpers.*;
-public class JmsQueueListener implements Runnable {
- // You can override these values at constructor time of your derived class
- protected int m_iSleepForThreads = 3000 // default sleep if no threads
- // available
- , m_iUpperThreadLimit = 10 // just in case - override if you wish
- ;
+public class JmsQueueListener extends AbstractListener {
public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
@@ -62,30 +50,12 @@
protected String m_sSelector;
- protected MessageConsumer m_oRdr;
+ protected MessageConsumer jmsMessageReceiver;
- protected int m_iQthr = 0, m_iMaxThr;
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger m_oLogger;
-
- protected GpListener m_oDad;
-
- protected DomElement m_oParms;
-
- protected String[] m_oActions;
-
- protected ActionDefinitionFactory m_oActionDefinitionFactory;
-
- public JmsQueueListener(GpListener p_oDad, DomElement p_oParms,
- ActionDefinitionFactory actionDefinitionFactory) throws Exception {
- m_oLogger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- m_oParms = p_oParms.cloneObj();
- m_oActionDefinitionFactory = actionDefinitionFactory;
+ public JmsQueueListener(GpListener commandListener, DomElement listenerConfig, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(commandListener, listenerConfig, actionDefinitionFactory);
checkMyParms();
- m_oThrGrp = new ThreadGroup(m_oParms.getName());
} // __________________________________
/**
@@ -96,36 +66,25 @@
* classpath
*/
protected void checkMyParms() throws Exception {
- String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
- m_oActions = sAtt.split(",");
-
- if(m_oActions.length == 0) {
- throw new ConfigurationException("Listener 'actions' list must be specified.");
- }
-
- sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-
// Third arg is null - Exception will br thrown if listenQueue is not
// found
- String sQueue = GpListener.obtainAtt(m_oParms, LISTEN_QUEUE, null);
+ String sQueue = GpListener.obtainAtt(listenerConfig, LISTEN_QUEUE, null);
// No problem if selector is null - everything in queue will be returned
- m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+ m_sSelector = listenerConfig.getAttr(LISTEN_MSG_SELECTOR);
m_oQconn = null;
m_oQsess = null;
m_oQueue = null;
- String sJndiType = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_TYPE,
+ String sJndiType = GpListener.obtainAtt(listenerConfig, LISTEN_JNDI_TYPE,
"jboss");
- String sJndiURL = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_URL,
+ String sJndiURL = GpListener.obtainAtt(listenerConfig, LISTEN_JNDI_URL,
"localhost");
Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
sJndiURL);
- String sFactClass = GpListener.obtainAtt(m_oParms,
+ String sFactClass = GpListener.obtainAtt(listenerConfig,
LISTEN_QUEUE_CONN_FACT, "ConnectionFactory");
Object tmp = oJndiCtx.lookup(sFactClass);
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
@@ -135,32 +94,22 @@
m_oQsess = m_oQconn.createQueueSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
m_oQconn.start();
- m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
+ jmsMessageReceiver = m_oQsess.createReceiver(m_oQueue, m_sSelector);
} // ________________________________
- /**
- * Implement run method for this Runnable <p/> Will continue to run until
- * controlling class (ref in m_oDad) indicates no more looping allowed for
- * all child classes <p/> This condition will not prevent child processes to
- * finish normally
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#receive()
*/
- public void run() {
+ @Override
+ protected Object[] receive() {
while (m_oDad.continueLooping()) {
- if (m_iQthr >= m_iMaxThr) {
- m_oLogger.info("Waiting for available threads...");
- try {
- Thread.sleep(m_iSleepForThreads);
- } catch (InterruptedException e) {
- return;
- }
- break;
- }
- Message oM = null;
+ Message jmsMessage = null;
try {
- oM = m_oRdr.receive(m_oDad.millisToWait());
+ jmsMessage = jmsMessageReceiver.receive(m_oDad.millisToWait());
} catch (JMSException oJ) {
- m_oLogger.error("JMS error on receive", oJ);
+ m_oLogger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
for (int i1 = 0; i1 < 3; i1++)
try {
checkMyParms();
@@ -170,101 +119,38 @@
try {
Thread.sleep(m_iSleepForThreads);
} catch (InterruptedException e1) { // Just return
- return;
+ m_oLogger.error("Unexpected thread interupt exception.", e);
+ return null;
}
}
}
- if (null == oM)
+ if (null == jmsMessage) {
+ // REVIEW: Can this really happen i.e. the JMS
continue;
+ }
- ActionProcessingPipeline runner = new ActionProcessingPipeline(oM);
- new Thread(runner).start();
+ return new Object[] {jmsMessage};
}
- if (null != m_oQsess)
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ if (null != m_oQsess) {
try {
m_oQsess.close();
} catch (Exception e1) {/* Tried my best - Just continue */
}
- if (null != m_oQconn)
+ }
+ if (null != m_oQconn) {
try {
m_oQconn.close();
} catch (Exception e2) {/* Tried my best - Just continue */
}
- } // ______________________________
-
- /**
- * Increment the active thread count.
- */
- private void incThreads() {
- m_iQthr++;
- }
-
- /**
- * Decrement the active thread count.
- */
- private void decThreads() {
- m_iQthr--;
- }
-
- /**
- * Action Processing Pipeline.
- * <p/>
- * Runs the actions in a listeners "actions" config on a message payload object received
- * by the listener implementation.
- * <p/>
- * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes. Needs to be sorted out as an
- * overall cleanup of these classes. Lots of duplicate code etc.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
- private class ActionProcessingPipeline implements Runnable {
-
- private Object object;
-
- /**
- * Private constructor.
- * @param initialObject The inital processing target object.
- */
- private ActionProcessingPipeline(Object initialObject) {
- this.object = initialObject;
}
-
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
- String currentAction = null;
-
- // Increment the active thread count for the listener on starting...
- incThreads();
-
- try {
- // Run the object through each ActionProcessor...
- for(String action : m_oActions) {
- ActionDefinition actionDefinition;
-
- currentAction = action.trim();
- actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
- if(actionDefinition == null) {
- throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
- }
-
- // The processing result of each action feeds into the processing of the next action...
- ActionProcessor processor = actionDefinition.getProcessor();
- object = processor.process(object);
-
- if(object == null && action != m_oActions[m_oActions.length - 1]) {
- m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
- break;
- }
- }
- } catch(Throwable thrown) {
- m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] threw an exception.", thrown);
- }
-
- // Decrement the active thread count for the listener on completion...
- decThreads();
- }
}
-} // ____________________________________________________________________________
+}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-14 15:21:01 UTC (rev 6228)
@@ -174,19 +174,19 @@
protected void checkMyParms() throws Exception
{
- checkAndStoreAtt(m_oParms,SimpleDataSource.DRIVER ,null);
- checkAndStoreAtt(m_oParms,SimpleDataSource.URL ,null);
- checkAndStoreAtt(m_oParms,SimpleDataSource.USER ,"");
- checkAndStoreAtt(m_oParms,SimpleDataSource.PASSWORD ,"");
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER ,null);
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.URL ,null);
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.USER ,"");
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.PASSWORD ,"");
for (TABLE_ATT oCurr : TABLE_ATT.values())
- checkAndStoreAtt(m_oParms,oCurr.toString(),null);
+ checkAndStoreAtt(listenerConfig,oCurr.toString(),null);
- checkAndStoreAtt(m_oParms,OPTIONAL_ATT.whereCondition.toString(),"");
- checkAndStoreAtt(m_oParms,OPTIONAL_ATT.orderBy.toString(),"");
+ checkAndStoreAtt(listenerConfig,OPTIONAL_ATT.whereCondition.toString(),"");
+ checkAndStoreAtt(listenerConfig,OPTIONAL_ATT.orderBy.toString(),"");
String sAtt = OPTIONAL_ATT.inProcessVals.toString();
- checkAndStoreAtt(m_oParms,sAtt,DEFAULT_STATES);
+ checkAndStoreAtt(listenerConfig,sAtt,DEFAULT_STATES);
m_sUpdStates = m_oVals.get(sAtt);
if (m_sUpdStates.length()<4)
throw new Exception("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)");
@@ -219,7 +219,7 @@
} //________________________________
@Override
- protected Object preProcess(Object p_o) throws Exception
+ protected Object preProcess(Object p_o)
{
return p_o;
} //________________________________
@@ -362,4 +362,11 @@
return sb.append(" for update").toString();
} //________________________________
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ }
+
} //____________________________________________________________________________
More information about the jboss-svn-commits
mailing list