[jboss-svn-commits] JBL Code SVN: r6228 - in labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb: actions command listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Sep 14 11:21:06 EDT 2006


Author: tfennelly
Date: 2006-09-14 11:21:01 -0400 (Thu, 14 Sep 2006)
New Revision: 6228

Added:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
Removed:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
Modified:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
ActionProcessor pipeline model changes

Deleted: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -1,43 +0,0 @@
-package org.jboss.soa.esb.actions;
-
-import java.io.Serializable;
-import java.util.Observable;
-import org.apache.log4j.Logger;
-
-import org.jboss.soa.esb.helpers.DomElement;
-import org.jboss.soa.esb.listeners.GpListener;
-
-public abstract class AbstractAction extends Observable	
-	implements Runnable
-{
-	public abstract void 		 processCurrentObject() throws Exception;
-	public abstract Serializable getOkNotification();
-	public abstract Serializable getErrorNotification();
-
-	protected DomElement	m_oParms;
-	protected Object		m_oCurr;
-	protected Logger		m_oLogger = Logger.getLogger(this.getClass());
-
-	protected AbstractAction(DomElement p_oP, Object p_oCurr)
-	{	m_oParms	= p_oP;
-		m_oCurr		= p_oCurr;
-	} //________________________________
-	
-	public void run()
-	{
-		try 
-		{ 
-			processCurrentObject();
-			GpListener.notifyOK(m_oParms,getOkNotification());
-		} 
-		catch (Exception e) 
-		{
-			GpListener.notifyError(m_oParms,e,getErrorNotification());
-		}
-		finally 
-		{	setChanged();
-			notifyObservers(new Integer(-1)); 
-		}
-	} //________________________________
-	
-} //____________________________________________________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -2,7 +2,7 @@
 
 import java.util.Hashtable;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.jboss.soa.esb.helpers.DomElement;
 
@@ -28,7 +28,7 @@
 	private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
 	
 	private String name;
-	private BlockingQueue<String> queue = new SynchronousQueue<String>();
+	private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
 	
 	public void open(DomElement config) throws CommandQueueException {
 		if(config == null) {
@@ -50,6 +50,13 @@
 	 */
 	public void addCommand(String command) {
 		queue.add(command);
+        while(!queue.isEmpty()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
 	}
 	
 	public String receiveCommand(long timeout) throws CommandQueueException {

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -0,0 +1,209 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.util.*;
+
+import org.apache.log4j.*;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.*;
+
+/**
+ * Base abstract message listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractListener implements Runnable {   
+    
+    // You can override these values at constructor time of your
+    // derived class after calling super(GpListener,DomElement)
+    protected int m_iSleepForThreads = 3000; // default sleep if no threads available
+    protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+
+    protected int m_iQthr = 0, m_iMaxThr;
+
+    protected ThreadGroup m_oThrGrp = null;
+
+    protected Logger m_oLogger;
+
+    protected GpListener m_oDad;
+
+    protected DomElement listenerConfig;
+
+    protected String[] m_oActions;
+
+    protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+    protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        
+        m_oLogger = Logger.getLogger(this.getClass());
+        m_oDad = p_oDad;
+        listenerConfig = p_oParms.cloneObj();
+        m_oActionDefinitionFactory = actionDefinitionFactory;
+        m_oThrGrp = new ThreadGroup(listenerConfig.getName());
+
+        String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
+        m_oActions = sAtt.split(",");
+        
+        if(m_oActions.length == 0) {
+            throw new ConfigurationException("Listener 'actions' list must be specified.");
+        }
+
+        sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+        int iMax = Integer.parseInt(sAtt);
+        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+    } // __________________________________
+
+    /**
+     * Implement run method for this Runnable <p/> Will continue to run until
+     * controlling class (ref in m_oDad) indicates no more looping allowed for
+     * all child classes <p/> This condition will not prevent child processes to
+     * finish normally
+     */
+    public void run() {
+        while (m_oDad.continueLooping()) {
+            Object[] processList = receive();
+            
+            for (Object oCurr : processList) {
+                if (m_iQthr >= m_iMaxThr) {
+                    m_oLogger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+                    try {
+                        Thread.sleep(m_iSleepForThreads);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                    break;
+                }
+
+                // Spawn a thread and push the message object through the pipeline...
+                ActionProcessingPipeline runner = new ActionProcessingPipeline(oCurr);
+                new Thread(runner).start();
+            }
+        }
+        
+        close();
+    }
+    
+    /**
+     * Receive message from underlying channel implementation.
+     * <p/>
+     * Implementations must perform a blocking receive.
+     * @return An array of Objects received on the channel.
+     */
+    protected abstract Object[] receive();
+    
+    /**
+     * Close the listener implemenation.
+     * <p/>
+     * Allows the listener to perform relevant close/cleanup tasks.
+     */
+    protected abstract void close();
+
+    /**
+     * Increment the active thread count.
+     */
+    private void incThreads() {
+        m_iQthr++;
+    }
+
+    /**
+     * Decrement the active thread count.
+     */
+    private void decThreads() {
+        m_iQthr--;
+    }
+    
+    /**
+     * Action Processing Pipeline.
+     * <p/>
+     * Runs the actions in a listeners "actions" config on a message payload object received
+     * by the listener implementation.
+     * <p/>
+     * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes.  Needs to be sorted out as an
+     * overall cleanup of these classes.  Lots of duplicate code etc. 
+     * 
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     * @since Version 4.0
+     */
+    private class ActionProcessingPipeline implements Runnable {
+        
+        private Object object;
+             
+        /**
+         * Private constructor.
+         * @param initialObject The inital processing target object.
+         */
+        private ActionProcessingPipeline(Object initialObject) {
+            this.object = initialObject;
+        }
+
+        /* (non-Javadoc)
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            String currentAction = null;
+            
+            // Increment the active thread count for the listener on starting...
+            incThreads();
+            
+            try {
+                // Run the object through each ActionProcessor...
+                for(String action : m_oActions) {
+                    ActionDefinition actionDefinition;
+
+                    currentAction = action.trim();
+                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+                    if(actionDefinition == null) {
+                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
+                    }
+                    
+                    // The processing result of each action feeds into the processing of the next action...
+                    ActionProcessor processor = actionDefinition.getProcessor();
+                    try {
+                        object = processor.process(object);
+                    } catch (Exception e) {
+                        GpListener.notifyError(listenerConfig, e, processor.getErrorNotification(object));
+                        throw e;
+                    }
+                    
+                    if(object == null && action != m_oActions[m_oActions.length - 1]) {
+                        m_oLogger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+                        break;
+                    }
+                    // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
+                    GpListener.notifyOK(listenerConfig, processor.getOkNotification(object));
+                }
+            } catch(Throwable thrown) {
+                m_oLogger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] thre an exception.", thrown);
+            }
+            
+            // Decrement the active thread count for the listener on completion...
+            decThreads();
+        }
+    }
+    
+} // ____________________________________________________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -24,205 +24,86 @@
 
 import java.util.*;
 
-import org.apache.log4j.*;
-
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinition;
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
 import org.jboss.soa.esb.helpers.*;
 
-public abstract class AbstractPoller implements Runnable {
-    protected abstract List<Object> pollForCandidates();
+/**
+ * Abstract Polling Listener.
+ * <p/>
+ * Polling listeners are listener implementations that periodically poll for message objects
+ * that require processing.  This type of listener implementation is required where the underlying
+ * message channel doesn't support a blocking receive operation.
+ * 
+ * @author Esteban
+ */
+public abstract class AbstractPoller extends AbstractListener {
 
-    protected abstract Object preProcess(Object p_o) throws Exception;
-
     // You can override these values at constructor time of your
     // derived class after calling super(GpListener,DomElement)
     protected int m_iMinPollMillis = 3000 // minimum polling interval
             , m_iDfltPollMillis = 20000 // default polling interval
-            , m_iSleepForThreads = 3000 // default sleep if no threads available
-            , m_iUpperThreadLimit = 10 // just in case - override if you wish
             ;
 
     public static final String PARM_POLL_LTCY = "pollLatencySecs";
 
-    protected int m_iQthr = 0, m_iMaxThr;
-
     protected int m_iPollMillis;
 
-    protected ThreadGroup m_oThrGrp = null;
-
-    protected Logger m_oLogger;
-
-    protected GpListener m_oDad;
-
-    protected DomElement m_oParms;
-
-    protected String[] m_oActions;
-
-    protected ActionDefinitionFactory m_oActionDefinitionFactory;
-
-    protected AbstractPoller(GpListener p_oDad, DomElement p_oParms,
-            ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-        m_oLogger = Logger.getLogger(this.getClass());
-        m_oDad = p_oDad;
-        m_oParms = p_oParms.cloneObj();
-        m_oActionDefinitionFactory = actionDefinitionFactory;
-        checkParms();
-    } // __________________________________
-
     /**
-     * Check for mandatory and optional attributes in parameter tree
-     * 
-     * @throws Exception -
-     *             if actionClass not specified or not in classpath or invalid
-     *             int values for maxThreads or pollLatencySecs
+     * Construct an abstract polling listener.
+     * @param commandListener The command listener.
+     * @param listenerConfig The configuration for this polling listener.
+     * @param actionDefinitionFactory The action definition factory for the bus.
+     * @throws Exception
      */
-    protected void checkParms() throws Exception {
-        String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
-        m_oActions = sAtt.split(",");
-        
-        if(m_oActions.length == 0) {
-            throw new ConfigurationException("Listener 'actions' list must be specified.");
-        }
+    protected AbstractPoller(GpListener commandListener, DomElement listenerConfig, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        super(commandListener, listenerConfig, actionDefinitionFactory);
 
-        sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
-        int iMax = Integer.parseInt(sAtt);
-        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-
-        sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
-        m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer
-                .parseInt(sAtt);
-        if (m_iPollMillis < m_iMinPollMillis)
+        String sAtt = listenerConfig.getAttr(PARM_POLL_LTCY);
+        m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer.parseInt(sAtt);
+        if (m_iPollMillis < m_iMinPollMillis) {
             m_iPollMillis = m_iMinPollMillis;
-    } // ________________________________
-
-    /**
-     * Increment the active thread count.
-     */
-    private void incThreads() {
-        m_iQthr++;
+        }
     }
 
     /**
-     * Decrement the active thread count.
+     * Polling listener receive implementation.
+     * @return An array of objects polled from the concrete Poller implementation.
      */
-    private void decThreads() {
-        m_iQthr--;
-    }
-
-    /**
-     * Implement run method for this Runnable <p/> Will continue to run until
-     * controlling class (ref in m_oDad) indicates no more looping allowed for
-     * all child classes <p/> This condition will not prevent child processes to
-     * finish normally
-     */
-    public void run() {
-        m_oThrGrp = new ThreadGroup(m_oParms.getName());
+    protected Object[] receive() {
         while (m_oDad.continueLooping()) {
             List<Object> olPending = pollForCandidates();
-            if (olPending.size() < 1) {
+            
+            if (olPending == null || olPending.isEmpty()) {
                 try {
                     Thread.sleep(m_iPollMillis);
                 } catch (InterruptedException e) {
-                    return;
+                    m_oLogger.error("Unexpected thread interupt exception.  Not terminating blocking receive!!", e);
                 }
                 continue;
-            }
-
-            for (Object oCurr : olPending) {
-                if (m_iQthr >= m_iMaxThr) {
-                    m_oLogger.info("Waiting for available threads...(max="
-                            + m_iMaxThr + ")");
-                    try {
-                        Thread.sleep(m_iSleepForThreads);
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                    break;
+            } else {
+                // Preprocess all the message objects.
+                // TODO: I really think this is no longer required or a good idea!!
+                for(int i = 0; i < olPending.size(); i++) {
+                    olPending.set(i, preProcess(olPending.get(i)));
                 }
-
-                // give the derived class an opportunity to do something
-                // before processing current object.
-                Object oProcess = null;
-                try {
-                    // REVIEW: Is this "preProcess" step still required now that we've got chained actions??
-                    if (null == (oProcess = preProcess(oCurr))) {
-                        continue;
-                    }
-                } catch (Exception ePre) {
-                    m_oLogger.error("preProcess(Object) FAILED", ePre);
-                    continue;
-                }
-
-                ActionProcessingPipeline runner = new ActionProcessingPipeline(oProcess);
-                new Thread(runner).start();
+                return olPending.toArray();
             }
         }
-    } // __________________________________
+        
+        return null;
+    }
 
-    
     /**
-     * Action Processing Pipeline.
-     * <p/>
-     * Runs the actions in a listeners "actions" config on a message payload object received
-     * by the listener implementation.
-     * <p/>
-     * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes.  Needs to be sorted out as an
-     * overall cleanup of these classes.  Lots of duplicate code etc. 
-     * 
-     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
-     * @since Version 4.0
+     * Poll for message objects.
+     * @return A list of message objects, or an empty list if there are no message objects.
      */
-    private class ActionProcessingPipeline implements Runnable {
-        
-        private Object object;
-             
-        /**
-         * Private constructor.
-         * @param initialObject The inital processing target object.
-         */
-        private ActionProcessingPipeline(Object initialObject) {
-            this.object = initialObject;
-        }
+    protected abstract List<Object> pollForCandidates();
 
-        /* (non-Javadoc)
-         * @see java.lang.Runnable#run()
-         */
-        public void run() {
-            String currentAction = null;
-            
-            // Increment the active thread count for the listener on starting...
-            incThreads();
-            
-            try {
-                // Run the object through each ActionProcessor...
-                for(String action : m_oActions) {
-                    ActionDefinition actionDefinition;
-
-                    currentAction = action.trim();
-                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
-                    if(actionDefinition == null) {
-                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
-                    }
-                    
-                    // The processing result of each action feeds into the processing of the next action...
-                    ActionProcessor processor = actionDefinition.getProcessor();
-                    object = processor.process(object);
-                    
-                    if(object == null && action != m_oActions[m_oActions.length - 1]) {
-                        m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
-                        break;
-                    }
-                }
-            } catch(Throwable thrown) {
-                m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] thre an exception.", thrown);
-            }
-            
-            // Decrement the active thread count for the listener on completion...
-            decThreads();
-        }
-    }
-    
-} // ____________________________________________________________________________
+    /**
+     * Preprocess the message object before returning for pipeline processing.
+     * @param message Message object for preprocessing.
+     * @return The preprocessed message object, or the supplied message unmodified.
+     */
+    protected abstract Object preProcess(Object message);
+    // TODO: Is this "preprocessing" step needed now that we have processing pipelines on listeners???
+}

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -64,7 +64,7 @@
      * <p/>[2] target file name in case actionClass finishes successfuly
      */
 	@Override
-	public Object preProcess(Object p_o) throws Exception 
+	public Object preProcess(Object p_o)
 	{
 		if (!(p_o instanceof File))
 			return null;
@@ -91,32 +91,32 @@
 		return Arrays.asList((Object[])oaF);
 	} //________________________________
 
-	protected void checkMyParms() throws Exception
+	private void checkMyParms() throws Exception
     { 
 	//  INPUT directory and suffix  (used for FileFilter)
-	  String sInpDir = GpListener.obtainAtt(m_oParms,FILE_INPUT_DIR,null);
+	  String sInpDir = GpListener.obtainAtt(listenerConfig,FILE_INPUT_DIR,null);
       m_oInpDir = new File(new URI(sInpDir));
       seeIfOkToWorkOnDir(m_oInpDir);
 
-      m_sInpSfx  = GpListener.obtainAtt(m_oParms,FILE_INPUT_SFX,null);
+      m_sInpSfx  = GpListener.obtainAtt(listenerConfig,FILE_INPUT_SFX,null);
       m_sInpSfx  = m_sInpSfx.trim();
       if (m_sInpSfx.length()<1)
     	  throw new Exception ("Invalid "+FILE_INPUT_SFX+" attribute");
 	  m_oFFilt = new FileEndsWith(m_sInpSfx);
 
 	//  WORK suffix (will rename in input directory)
-      m_sWrkSfx	= GpListener.obtainAtt(m_oParms,FILE_WORK_SFX,".esbWork").trim();
+      m_sWrkSfx	= GpListener.obtainAtt(listenerConfig,FILE_WORK_SFX,".esbWork").trim();
       if (m_sWrkSfx.length()<1)
     	  throw new Exception ("Invalid "+FILE_WORK_SFX+" attribute");
       if (m_sInpSfx.equals(m_sWrkSfx))
     	  throw new Exception("Work suffix must differ from input suffix <"+m_sWrkSfx+">");
 
     //    ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
-      String sErrDir = GpListener.obtainAtt(m_oParms,FILE_ERROR_DIR,sInpDir);
+      String sErrDir = GpListener.obtainAtt(listenerConfig,FILE_ERROR_DIR,sInpDir);
       m_oErrorDir = new File(new URI(sErrDir));
       seeIfOkToWorkOnDir(m_oErrorDir);
 
-      m_sErrSfx  = GpListener.obtainAtt(m_oParms,FILE_ERROR_SFX,".esbError").trim();
+      m_sErrSfx  = GpListener.obtainAtt(listenerConfig,FILE_ERROR_SFX,".esbError").trim();
       if (m_sErrSfx.length()<1)
     	  throw new Exception ("Invalid "+FILE_ERROR_SFX+" attribute");
       if (m_oErrorDir.equals(m_oInpDir) && m_sInpSfx.equals(m_sErrSfx))
@@ -124,16 +124,16 @@
 
 
    //    Do users wish to delete files that were processed OK ?
-      String sPostDel = GpListener.obtainAtt(m_oParms,FILE_POST_DEL,"false").trim();
+      String sPostDel = GpListener.obtainAtt(listenerConfig,FILE_POST_DEL,"false").trim();
       m_bPostDel = Boolean.parseBoolean(sPostDel);
       if (m_bPostDel)
     	  return;
 
     //    POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
-      String sPostDir = GpListener.obtainAtt(m_oParms,FILE_POST_DIR,sInpDir);
+      String sPostDir = GpListener.obtainAtt(listenerConfig,FILE_POST_DIR,sInpDir);
       m_oPostDir = new File(new URI(sPostDir));
       seeIfOkToWorkOnDir(m_oPostDir);
-      m_sPostSfx  = GpListener.obtainAtt(m_oParms,FILE_POST_SFX,".esbDone").trim();
+      m_sPostSfx  = GpListener.obtainAtt(listenerConfig,FILE_POST_SFX,".esbDone").trim();
       if (m_oPostDir.equals(m_oInpDir))
       {	if (m_sPostSfx.length()<1)
     	  throw new Exception ("Invalid "+FILE_POST_SFX+" attribute");
@@ -173,4 +173,12 @@
       } //______________________________
     } //____________________________________________________
 
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+     */
+    @Override
+    protected void close() {
+    }
+    
 } //____________________________________________________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -34,7 +34,6 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.AbstractAction;
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
 import org.jboss.soa.esb.command.CommandQueue;
 import org.jboss.soa.esb.command.CommandQueueException;
@@ -544,10 +543,10 @@
 	 *             supplied by invoker
 	 */
 	public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
-			throws Exception {
+			throws ConfigurationException {
 		String sVal = p_oP.getAttr(p_sAtt);
 		if ((null == sVal) && (null == p_sDefault))
-			throw new Exception("Missing or invalid <" + p_sAtt + "> attribute");
+			throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
 
 		return (null != sVal) ? sVal : p_sDefault;
 	} // ________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -22,25 +22,13 @@
 
 package org.jboss.soa.esb.listeners;
 
-import java.util.Arrays;
-
-import org.apache.log4j.*;
-
 import javax.naming.*;
 import javax.jms.*;
 
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinition;
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
 import org.jboss.soa.esb.helpers.*;
 
-public class JmsQueueListener implements Runnable {
-    // You can override these values at constructor time of your derived class
-    protected int m_iSleepForThreads = 3000 // default sleep if no threads
-                                            // available
-            , m_iUpperThreadLimit = 10 // just in case - override if you wish
-            ;
+public class JmsQueueListener extends AbstractListener {
 
     public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
 
@@ -62,30 +50,12 @@
 
     protected String m_sSelector;
 
-    protected MessageConsumer m_oRdr;
+    protected MessageConsumer jmsMessageReceiver;
 
-    protected int m_iQthr = 0, m_iMaxThr;
 
-    protected ThreadGroup m_oThrGrp = null;
-
-    protected Logger m_oLogger;
-
-    protected GpListener m_oDad;
-
-    protected DomElement m_oParms;
-
-    protected String[] m_oActions;
-
-    protected ActionDefinitionFactory m_oActionDefinitionFactory;
-
-    public JmsQueueListener(GpListener p_oDad, DomElement p_oParms,
-            ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-        m_oLogger = Logger.getLogger(this.getClass());
-        m_oDad = p_oDad;
-        m_oParms = p_oParms.cloneObj();
-        m_oActionDefinitionFactory = actionDefinitionFactory;
+    public JmsQueueListener(GpListener commandListener, DomElement listenerConfig, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        super(commandListener, listenerConfig, actionDefinitionFactory);
         checkMyParms();
-        m_oThrGrp = new ThreadGroup(m_oParms.getName());
     } // __________________________________
 
     /**
@@ -96,36 +66,25 @@
      *             classpath
      */
     protected void checkMyParms() throws Exception {
-        String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
-        m_oActions = sAtt.split(",");
-        
-        if(m_oActions.length == 0) {
-            throw new ConfigurationException("Listener 'actions' list must be specified.");
-        }
-
-        sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
-        int iMax = Integer.parseInt(sAtt);
-        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-
         // Third arg is null - Exception will br thrown if listenQueue is not
         // found
-        String sQueue = GpListener.obtainAtt(m_oParms, LISTEN_QUEUE, null);
+        String sQueue = GpListener.obtainAtt(listenerConfig, LISTEN_QUEUE, null);
 
         // No problem if selector is null - everything in queue will be returned
-        m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+        m_sSelector = listenerConfig.getAttr(LISTEN_MSG_SELECTOR);
 
         m_oQconn = null;
         m_oQsess = null;
         m_oQueue = null;
 
-        String sJndiType = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_TYPE,
+        String sJndiType = GpListener.obtainAtt(listenerConfig, LISTEN_JNDI_TYPE,
                 "jboss");
-        String sJndiURL = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_URL,
+        String sJndiURL = GpListener.obtainAtt(listenerConfig, LISTEN_JNDI_URL,
                 "localhost");
         Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
                 sJndiURL);
 
-        String sFactClass = GpListener.obtainAtt(m_oParms,
+        String sFactClass = GpListener.obtainAtt(listenerConfig,
                 LISTEN_QUEUE_CONN_FACT, "ConnectionFactory");
         Object tmp = oJndiCtx.lookup(sFactClass);
         QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
@@ -135,32 +94,22 @@
         m_oQsess = m_oQconn.createQueueSession(false,
                 TopicSession.AUTO_ACKNOWLEDGE);
         m_oQconn.start();
-        m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
+        jmsMessageReceiver = m_oQsess.createReceiver(m_oQueue, m_sSelector);
 
     } // ________________________________
 
-    /**
-     * Implement run method for this Runnable <p/> Will continue to run until
-     * controlling class (ref in m_oDad) indicates no more looping allowed for
-     * all child classes <p/> This condition will not prevent child processes to
-     * finish normally
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#receive()
      */
-    public void run() {
+    @Override
+    protected Object[] receive() {
         while (m_oDad.continueLooping()) {
-            if (m_iQthr >= m_iMaxThr) {
-                m_oLogger.info("Waiting for available threads...");
-                try {
-                    Thread.sleep(m_iSleepForThreads);
-                } catch (InterruptedException e) {
-                    return;
-                }
-                break;
-            }
-            Message oM = null;
+            Message jmsMessage = null;
             try {
-                oM = m_oRdr.receive(m_oDad.millisToWait());
+                jmsMessage = jmsMessageReceiver.receive(m_oDad.millisToWait());
             } catch (JMSException oJ) {
-                m_oLogger.error("JMS error on receive", oJ);
+                m_oLogger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
                 for (int i1 = 0; i1 < 3; i1++)
                     try {
                         checkMyParms();
@@ -170,101 +119,38 @@
                         try {
                             Thread.sleep(m_iSleepForThreads);
                         } catch (InterruptedException e1) { // Just return
-                            return;
+                            m_oLogger.error("Unexpected thread interupt exception.", e);
+                            return null;
                         }
                     }
             }
-            if (null == oM)
+            if (null == jmsMessage) {
+                // REVIEW: Can this really happen i.e. the JMS
                 continue;
+            }
 
-            ActionProcessingPipeline runner = new ActionProcessingPipeline(oM);
-            new Thread(runner).start();
+            return new Object[] {jmsMessage};
         }
-        if (null != m_oQsess)
+        
+        return null;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+     */
+    @Override
+    protected void close() {
+        if (null != m_oQsess) {
             try {
                 m_oQsess.close();
             } catch (Exception e1) {/* Tried my best - Just continue */
             }
-        if (null != m_oQconn)
+        }
+        if (null != m_oQconn) {
             try {
                 m_oQconn.close();
             } catch (Exception e2) {/* Tried my best - Just continue */
             }
-    } // ______________________________
-
-    /**
-     * Increment the active thread count.
-     */
-    private void incThreads() {
-        m_iQthr++;
-    }
-
-    /**
-     * Decrement the active thread count.
-     */
-    private void decThreads() {
-        m_iQthr--;
-    }
-
-    /**
-     * Action Processing Pipeline.
-     * <p/>
-     * Runs the actions in a listeners "actions" config on a message payload object received
-     * by the listener implementation.
-     * <p/>
-     * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes.  Needs to be sorted out as an
-     * overall cleanup of these classes.  Lots of duplicate code etc. 
-     * 
-     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
-     * @since Version 4.0
-     */
-    private class ActionProcessingPipeline implements Runnable {
-        
-        private Object object;
-             
-        /**
-         * Private constructor.
-         * @param initialObject The inital processing target object.
-         */
-        private ActionProcessingPipeline(Object initialObject) {
-            this.object = initialObject;
         }
-
-        /* (non-Javadoc)
-         * @see java.lang.Runnable#run()
-         */
-        public void run() {
-            String currentAction = null;
-            
-            // Increment the active thread count for the listener on starting...
-            incThreads();
-            
-            try {
-                // Run the object through each ActionProcessor...
-                for(String action : m_oActions) {
-                    ActionDefinition actionDefinition;
-
-                    currentAction = action.trim();
-                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
-                    if(actionDefinition == null) {
-                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
-                    }
-                    
-                    // The processing result of each action feeds into the processing of the next action...
-                    ActionProcessor processor = actionDefinition.getProcessor();
-                    object = processor.process(object);
-                    
-                    if(object == null && action != m_oActions[m_oActions.length - 1]) {
-                        m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + processor.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
-                        break;
-                    }
-                }
-            } catch(Throwable thrown) {
-                m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            }
-            
-            // Decrement the active thread count for the listener on completion...
-            decThreads();
-        }
     }
-} // ____________________________________________________________________________
+} 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-09-14 15:20:03 UTC (rev 6227)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-09-14 15:21:01 UTC (rev 6228)
@@ -174,19 +174,19 @@
 
 	protected void checkMyParms() throws Exception
     { 
-	  checkAndStoreAtt(m_oParms,SimpleDataSource.DRIVER		,null);
-	  checkAndStoreAtt(m_oParms,SimpleDataSource.URL		,null);
-	  checkAndStoreAtt(m_oParms,SimpleDataSource.USER		,"");
-	  checkAndStoreAtt(m_oParms,SimpleDataSource.PASSWORD	,"");
+	  checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER		,null);
+	  checkAndStoreAtt(listenerConfig,SimpleDataSource.URL		,null);
+	  checkAndStoreAtt(listenerConfig,SimpleDataSource.USER		,"");
+	  checkAndStoreAtt(listenerConfig,SimpleDataSource.PASSWORD	,"");
 	  
 	  for (TABLE_ATT oCurr : TABLE_ATT.values())
-		  checkAndStoreAtt(m_oParms,oCurr.toString(),null);
+		  checkAndStoreAtt(listenerConfig,oCurr.toString(),null);
 	  
-	  checkAndStoreAtt(m_oParms,OPTIONAL_ATT.whereCondition.toString(),"");
-	  checkAndStoreAtt(m_oParms,OPTIONAL_ATT.orderBy.toString(),"");
+	  checkAndStoreAtt(listenerConfig,OPTIONAL_ATT.whereCondition.toString(),"");
+	  checkAndStoreAtt(listenerConfig,OPTIONAL_ATT.orderBy.toString(),"");
 
 	  String sAtt = OPTIONAL_ATT.inProcessVals.toString();
-	  checkAndStoreAtt(m_oParms,sAtt,DEFAULT_STATES);
+	  checkAndStoreAtt(listenerConfig,sAtt,DEFAULT_STATES);
 	  m_sUpdStates = m_oVals.get(sAtt);
 	  if (m_sUpdStates.length()<4)
 		  throw new Exception("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)");
@@ -219,7 +219,7 @@
     } //________________________________
 
 	@Override
-	protected Object preProcess(Object p_o) throws Exception 
+	protected Object preProcess(Object p_o) 
 	{
 		return p_o;
 	} //________________________________
@@ -362,4 +362,11 @@
 		return sb.append(" for update").toString();
 	} //________________________________
 
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+     */
+    @Override
+    protected void close() {
+    }
+
 } //____________________________________________________________________________




More information about the jboss-svn-commits mailing list