[jboss-svn-commits] JBL Code SVN: r5590 - in labs/jbossesb/trunk/product/core/listeners/src: . org/jboss/soa/esb org/jboss/soa/esb/actions org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Aug 7 22:20:28 EDT 2006


Author: estebanschifman
Date: 2006-08-07 22:20:20 -0400 (Mon, 07 Aug 2006)
New Revision: 5590

Added:
   labs/jbossesb/trunk/product/core/listeners/src/log4j.properties
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
Removed:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
Heavy refactoring of the listeners package.
New package prg.jboss.soa.esb.actions added


Added: labs/jbossesb/trunk/product/core/listeners/src/log4j.properties
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/log4j.properties	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/log4j.properties	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,7 @@
+### direct log messages to stdout ###
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
+### set log levels - for more verbose logging change 'info' to 'debug' ###
+log4j.rootLogger=info, stdout
\ No newline at end of file

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,25 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.util.Observable;
+import org.apache.log4j.Logger;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+public abstract class AbstractAction extends Observable	
+	implements Runnable
+{
+	public abstract void 		 processCurrentObject() throws Exception;
+	public abstract Serializable getOkNotification();
+	public abstract Serializable getErrorNotification();
+
+	DomElement	m_oParms;
+	Object		m_oCurr;
+	Logger		m_oLogger = Logger.getLogger(this.getClass());
+
+	protected AbstractAction(DomElement p_oP, Object p_oCurr)
+	{	m_oParms	= p_oP;
+		m_oCurr		= p_oCurr;
+	} //________________________________
+	
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,74 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.File;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+
+public abstract class AbstractFileAction extends AbstractAction 
+{
+/**
+ * Extend this class if you need to implement an action class intended to
+ * process one file at a time
+ * <br/>See FileCopier as an example
+ * @param p_oP DomElement - Parameter tree passed by controlling listener
+ * @param p_oCurr Object  - This is the object that's going to get processed
+ * <br/>All classes that extend AbstractFileAction receive an instance of the internal 
+ * <br/>class Params containing info needed to rename the file in different stages
+ * <br/>of processing 
+ */
+	protected AbstractFileAction(DomElement p_oP, Object p_oCurr)
+	{	super(p_oP,p_oCurr);
+	} //________________________________
+
+	public static class Params
+	{
+		public boolean	bPostDelete;
+		public File		oInpF	,oWrkF	,oErrF	,oDoneF;
+		public String toString() { return oInpF.toString(); }
+	} //________________________________
+
+	public boolean isPostDelete() { return ((Params)m_oCurr).bPostDelete; }
+	public File getInputFile() { return ((Params)m_oCurr).oInpF; }
+	public File getWorkFile()  { return ((Params)m_oCurr).oWrkF; }
+	public File getErrorFile() { return ((Params)m_oCurr).oErrF; }
+	public File getDoneOkFile(){ return ((Params)m_oCurr).oDoneF; }
+	
+	public boolean renameToError() { return getWorkFile().renameTo(getErrorFile()); }
+	public boolean renameToDone () { return getWorkFile().renameTo(getDoneOkFile()); }
+
+/**
+ * Overrides run() in AbstractAction
+ * <p/>Files processed by action classes need to be renamed during processing to
+ * disable other listeners (or other threads launched by the same listener that
+ * started this thread) to pick up the same file
+ * <br/>Once processing ends a suffix will be added to the name of the original
+ * file that has been processed.  The suffix will be different according to the
+ * result of processing (OK or Exception).  Files could be moved to different
+ * directories as well
+ * <br/> Parameters for these options can be provided at run time in the
+ * DomElement (arg 0 in constructor)
+ */
+	public void run()
+	{
+		try 
+		{ 
+			processCurrentObject();
+			if (isPostDelete())
+				getWorkFile().delete();
+			else
+				renameToDone();
+			GpListener.notifyOK(m_oParms,getOkNotification());
+		} 
+		catch (Exception e) 
+		{
+			renameToError();
+			GpListener.notifyError(m_oParms,e,getErrorNotification());
+		}
+		finally 
+		{	setChanged();
+			notifyObservers(new Integer(-1)); 
+		}
+	} //________________________________
+	
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,134 @@
+package org.jboss.soa.esb.actions;
+
+import java.util.*;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import javax.sql.DataSource;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.listeners.SqlTablePoller.ROW_STATE;
+
+public abstract class AbstractSqlRowAction  extends AbstractAction 
+{
+	protected JdbcCleanConn	m_oConn;
+	protected PreparedStatement m_PSsel4U	,m_PSupd;
+
+	protected AbstractSqlRowAction(DomElement p_oP, Object p_oCurr) throws Exception
+	{
+		super(p_oP,p_oCurr);
+		DataSource oDS	= new SimpleDataSource(m_oParms);
+		m_oConn			= new JdbcCleanConn(oDS);
+		m_PSsel4U		= m_oConn.prepareStatement(getSel4Upd());
+		m_PSupd			= m_oConn.prepareStatement(getUpdStmt());	
+
+		int iParm=1;
+  		for (String sColName : getKeys())
+  		{	
+  			Object oVal	= getColumnValue(sColName);
+  			m_PSsel4U.setObject	(iParm	,oVal);
+  			// parameters are +1 in update statement
+  			// autoincrement leaves things ready for next SQL parameter
+  			m_PSupd.setObject	(++iParm,oVal);
+  		}
+	} //________________________________
+	
+	public static class Params
+	{
+		public String	sUpdStates;
+		public String[]	saCols	,saKeys;
+		public String	sSel4Upd,sUpdate;
+		public Map<String,Object> omVals;
+		public String toString() { return omVals.toString(); }
+	} //________________________________
+
+	protected String getSel4Upd()
+	{	return ((Params)m_oCurr).sSel4Upd;
+	} //________________________________
+
+	protected String getUpdStmt()
+	{	return ((Params)m_oCurr).sUpdate;
+	} //________________________________
+
+	protected String getStatus(ROW_STATE p_oState)
+	{	int iPos = p_oState.ordinal();
+		return ((Params)m_oCurr).sUpdStates.substring(iPos,++iPos);
+	} //________________________________
+
+	protected Object getColumnValue(String p_sKey)
+	{	return ((Params)m_oCurr).omVals.get(p_sKey);
+	} //________________________________
+
+	protected String[] getKeys()
+	{	return ((Params)m_oCurr).saKeys;
+	} //________________________________
+
+	private boolean changeStatus (ROW_STATE pFrom, ROW_STATE pTo) throws Exception
+	{	ResultSet RS = m_oConn.execQueryWait(m_PSsel4U,5);
+		if (! RS.next())
+	  		return false;
+	  	if (null!=pFrom)
+	  	{	String sOldStatus = RS.getString(1).substring(0,1);
+	  		if (!sOldStatus.equalsIgnoreCase(getStatus(pFrom)))
+	  		{	m_oConn.rollback();
+	  			return false;
+	  		}
+	  	}
+	  	m_PSupd.setString(1,getStatus(pTo));
+	  	m_oConn.execUpdWait(m_PSupd,5);
+	  	m_oConn.commit();
+	  	
+		return true;
+	} //______________________________
+
+	public void run()
+	{
+  		// will only continue if it can change status to "Working"
+		try
+		{
+	  		if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working))
+	  		{	m_oLogger.warn("Unable to change status to Working");
+	  			m_oConn.rollback();
+	  		}
+	 		m_oConn.commit();			
+		}
+		catch(Exception e)
+		{	m_oLogger.error("Unable to change status to Working",e);
+			return;
+		}
+
+		try 
+		{ 
+			processCurrentObject();
+			changeStatus(ROW_STATE.Working,ROW_STATE.Done);
+			m_oConn.commit();
+			GpListener.notifyOK(m_oParms,getOkNotification());
+		} 
+		catch (Exception e) 
+		{
+			try 
+			{	changeStatus(ROW_STATE.Working,ROW_STATE.Error); 
+				m_oConn.commit();
+			}
+			catch (Exception eErr)
+			{
+				m_oLogger.error("Unable to change status to ERROR - Really weird - shouldn't happen");
+			}
+			GpListener.notifyError(m_oParms,e,getErrorNotification());
+		}
+		finally 
+		{	setChanged();
+			notifyObservers(new Integer(-1));
+			if (null!=m_oConn)
+			{	try	{	m_oConn.rollback(); }
+				catch(Exception e)	{ /* OK just continue */ }
+				m_oConn.release();
+			}
+		}
+	} //________________________________
+	
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,75 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.text.*;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+/**
+ * Use this class to tune your XML configurations
+ * <p/>Once your config works with this dummy class, you can
+ * switch to your own action class
+ * <p/>You will have to implement these three methods 
+ * in your own action class
+ * 
+ * @author Esteban
+ *
+ */
+public class DummyAction extends AbstractAction 
+{
+/**
+ * Constructor must always have the configuration tree and the object
+ * that the run() method will process
+ * @param p_oP
+ * @param p_oCurr
+ */
+	public DummyAction(DomElement p_oP, Object p_oCurr) throws Exception
+	{
+		super(p_oP,p_oCurr);
+	}
+	
+	public void run()
+	{
+		try 
+		{ 
+			processCurrentObject();
+			GpListener.notifyOK(m_oParms,getOkNotification());
+		} 
+		catch (Exception e) 
+		{
+			GpListener.notifyError(m_oParms,e,getErrorNotification());
+		}
+		finally 
+		{	setChanged();
+			notifyObservers(new Integer(-1)); 
+		}
+	} //________________________________
+	
+	@Override
+	public void processCurrentObject() throws Exception 
+	{
+		m_oLogger.info("processObject was called with <<"
+				+m_oCurr.toString()+">>");
+	} //________________________________
+	
+	private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+	private String getStamp() 
+		{ return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+	@Override
+	public Serializable getOkNotification()
+	{
+		return getStamp()+" Notif OK - <"
+		+((null==m_oCurr)?"null":m_oCurr.toString())
+		+">";
+	} //________________________________
+
+	@Override
+	public Serializable getErrorNotification()
+	{
+		return getStamp()+" Notif ERROR - <"
+		+((null==m_oCurr)?"null":m_oCurr.toString())
+		+">";
+	} //________________________________
+	
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,57 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.text.*;
+
+import org.jboss.soa.esb.helpers.DomElement;
+/**
+ * Use this class to tune your XML configurations
+ * <p/>Once your config works with this dummy class, you can
+ * switch to your own action class
+ * <p/>You will have to implement these three methods 
+ * in your own action class
+ * 
+ * @author Esteban
+ *
+ */
+public class DummyFileAction extends AbstractFileAction 
+{
+/**
+ * Constructor must always have the configuration tree and the object
+ * that the run() method will process
+ * @param p_oP
+ * @param p_oCurr
+ */
+	public DummyFileAction(DomElement p_oP, Object p_oCurr) throws Exception
+	{
+		super(p_oP,p_oCurr);
+	}
+	
+	@Override
+	public void processCurrentObject() throws Exception 
+	{
+		m_oLogger.info("processObject was called with <<"
+				+m_oCurr.toString()+">>");
+	} //________________________________
+	
+	private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+	private String getStamp() 
+		{ return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+	@Override
+	public Serializable getOkNotification()
+	{
+		return getStamp()+" Notif OK - <"
+		+((null==m_oCurr)?"null":m_oCurr.toString())
+		+">";
+	} //________________________________
+
+	@Override
+	public Serializable getErrorNotification()
+	{
+		return getStamp()+" Notif ERROR - <"
+		+((null==m_oCurr)?"null":m_oCurr.toString())
+		+">";
+	} //________________________________
+	
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,57 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.text.*;
+
+import org.jboss.soa.esb.helpers.DomElement;
+/**
+ * Use this class to tune your XML configurations
+ * <p/>Once your config works with this dummy class, you can
+ * switch to your own action class
+ * <p/>You will have to implement these three methods 
+ * in your own action class
+ * 
+ * @author Esteban
+ *
+ */
+public class DummySqlRowAction extends AbstractSqlRowAction 
+{
+/**
+ * Constructor must always have the configuration tree and the object
+ * that the run() method will process
+ * @param p_oP
+ * @param p_oCurr
+ */
+	public DummySqlRowAction(DomElement p_oP, Object p_oCurr) throws Exception
+	{
+		super(p_oP,p_oCurr);
+	}
+	
+	@Override
+	public void processCurrentObject() throws Exception 
+	{
+		m_oLogger.info("processObject was called with <<"
+				+m_oCurr.toString()+">>");
+	} //________________________________
+	
+	private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+	private String getStamp() 
+		{ return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+	@Override
+	public Serializable getOkNotification()
+	{
+		return getStamp()+" Notif OK - <"
+		+((null==m_oCurr)?"null":m_oCurr.toString())
+		+">";
+	} //________________________________
+
+	@Override
+	public Serializable getErrorNotification()
+	{
+		return getStamp()+" Notif ERROR - <"
+		+((null==m_oCurr)?"null":m_oCurr.toString())
+		+">";
+	} //________________________________
+	
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,107 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.soa.esb.actions;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.net.URI;
+
+import org.jboss.soa.esb.helpers.*;
+
+public class FileCopier extends AbstractFileAction
+{
+  private static final String TMP_SUFFIX    = ".notReady";
+  private static final String PARMS_COPY_TO = "CopyTo";
+  private static final String PARMS_OUTDIR  = "copyToDirURI";
+  private static final String PARMS_OUTSFX  = "copyToSuffix";
+
+  private File []     m_oaTmpFile;
+  private File []     m_oaOutFile;
+
+  public FileCopier(DomElement p_oP, Object p_oCurr) throws Exception
+  { super(p_oP,p_oCurr);
+    checkMyParms();
+  } //__________________________________
+
+  public void processCurrentObject() throws Exception
+  {
+      // Open & create temp files
+      FileInputStream oFI     = new FileInputStream(getInputFile());
+      FileOutputStream []oaFO = new FileOutputStream[m_oaTmpFile.length];
+      for (int i1=0; i1<oaFO.length;i1++)
+        oaFO[i1]  = new FileOutputStream(m_oaTmpFile[i1]);
+
+      //  Perform Copy
+      byte[] ba = new byte[50000];
+      while (true)
+      { int iQ = oFI.read(ba);
+        if (iQ < 0)     break;
+        for (int i1=0; i1<oaFO.length; i1++)
+          oaFO[i1].write(ba,0,iQ);
+      }
+      //  Close
+      oFI.close();
+      for (int i1=0; i1<oaFO.length; i1++)  oaFO[i1].close();
+
+      //  Rename
+      for (int i1=0; i1<m_oaTmpFile.length;i1++)
+      { m_oaOutFile[i1].delete();
+        m_oaTmpFile[i1].renameTo(m_oaOutFile[i1]);
+      }
+  } //__________________________________
+
+  protected void checkMyParms() throws Exception
+  { DomElement[] oaPout = m_oParms.getElementChildren(PARMS_COPY_TO);
+    m_oaTmpFile  = new File[oaPout.length];
+    m_oaOutFile  = new File[oaPout.length];
+
+    if (oaPout.length<1)
+    { m_oLogger.warn("No output files specified for FileCopier");
+      return;
+    }
+    String sFile  = getInputFile().getName();
+
+    for (int i1=0; i1<m_oaTmpFile.length;i1++)
+    { File oDir  = new File(new URI(oaPout[i1].getAttr(PARMS_OUTDIR)));
+      m_oaTmpFile[i1] = File.createTempFile(sFile, TMP_SUFFIX, oDir);
+      String sCpySuffix = oaPout[i1].getAttr(PARMS_OUTSFX);
+      m_oaOutFile[i1] = new File(oDir,getInputFile().getName()+sCpySuffix);
+    }
+    return;
+  } //__________________________________
+
+  @Override
+  public Serializable getOkNotification() 
+  {
+	  return "File "+getInputFile()+" successfully copied to all destinations";
+  } //__________________________________
+	
+  @Override
+  public Serializable getErrorNotification()
+  {
+	  return "Problems copying "+getInputFile()+" to configured destinations";
+  } //__________________________________
+
+} //____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -24,345 +24,138 @@
 package org.jboss.soa.esb.listeners;
 
 import java.util.*;
+import java.lang.reflect.Constructor;
 
 import org.apache.log4j.*;
 
-import javax.naming.*;
-import javax.jms.*;
-
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.actions.AbstractAction;
 import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.processors.AbstractProcessor;
 
-public abstract class AbstractPoller
+public abstract class AbstractPoller implements Runnable, Observer
 {
+  protected abstract List<Object> pollForCandidates();
+  protected abstract Object		preProcess	(Object p_o) throws Exception;
 	
-  protected abstract GroupOfChilds	newChildGroup(ThreadGroup pThG) throws Exception;
-  
-  // You can override these three values at constructor time of your
-  // derived class after calling super(String)
+  // You can override these values at constructor time of your
+  // derived class after calling super(GpListener,DomElement)
   protected int	m_iMinPollMillis	= 3000	 // minimum polling interval
   				,m_iDfltPollMillis	= 20000	 // default polling interval
-  				,m_iDfltReloadMillis= 180000 // default interval between
-  											 // parameter reloading
+  				,m_iSleepForThreads	= 3000	// default sleep if no threads available
+  				,m_iUpperThreadLimit = 10	// just in case - override if you wish 
   ;
   
-  public static final String PARM_ACTION_CLASS		= "actionClass";
-
   public static final String PARM_POLL_LTCY			= "pollLatencySecs";
-  public static final String PARM_RELOAD_LTCY 		= "parmsReloadSecs";
 
-  public static final String PARM_TOPIC_CONN_FACT	= "topicConnFactoryClass";
-  public static final String PARM_QUIESCE_TOPIC		= "quiesceTopic";
-  public static final String PARM_QUIESCE_SELECTOR	= "quiesceSelector";
+  protected int 		m_iQthr = 0, m_iMaxThr;
+  protected int			m_iPollMillis;
 
-  protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
-  protected Map<String,GroupOfChilds> m_omChildPrc
-  		= new HashMap<String,GroupOfChilds>();
+  protected ThreadGroup m_oThrGrp = null;
 	
-  protected ParamRepository m_oParmRepos;
-  protected String	 	m_oParmName;
   protected Logger		m_oLogger;
 
+  protected GpListener	m_oDad;
   protected DomElement	m_oParms;
-  protected boolean		m_bEndRequested;
-	
-  protected TopicConnection m_oTopicConn;
-  protected TopicSession 	m_oSession;
-  protected Topic 			m_oTopic;
-  protected TopicSubscriber m_oTopicSubs;
-  
-	
+  protected Class 		m_oExecClass;
 
-  protected AbstractPoller(String p_sParamsUid) throws Exception
+  protected AbstractPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
   {
-    m_oLogger = Util.getDefaultLogger(this.getClass());
-
-    m_oParmRepos = ParamRepositoryFactory.getInstance();
-    m_oParmName = p_sParamsUid;
+    m_oLogger	= Logger.getLogger(this.getClass());
+    m_oDad		= p_oDad;
+    m_oParms	= p_oParms.cloneObj();
+    checkParms();
   } //__________________________________
-  
-  protected void runUntilEndRequested() throws Exception
-  {	while (! m_bEndRequested)
-  	{ try 
-  	  {	String sMsg = (null == m_oParms) 
-  				? "Initial Parameter loading" : "Reloading Params";
-  	    m_oLogger.info(formatLogMsg(sMsg));
-  		m_oParms = DomElement.fromXml(m_oParmRepos.get(m_oParmName));
-  	  }
-  	  catch (Exception e)
-  	  {	
-  	    StringBuilder sb = new StringBuilder ("Problems loading params ")
-  			.append(m_oParmName)
-  			.append((null==m_oParms)? " exiting..." : "continuing to use cached params") 
-  			;
-  	  	m_oLogger.error(formatLogMsg(sb.toString()));
-  	  	if (null==m_oParms)
-  	  		throw e;
-  	  }
-  	  quiesceTopicSubscribe();
-	  executeOneCycle();
-  	}
-  } //__________________________________
-  
-  private void executeOneCycle() throws Exception
+/**
+ * Check for mandatory and optional attributes in parameter tree
+ * 
+ * @throws Exception - if actionClass not specified or not in classpath
+ *  or invalid int values for maxThreads or pollLatencySecs
+ */  
+  protected void checkParms() throws Exception
   {
-    String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
-    long lNewLoad = System.currentTimeMillis()
-        + ( (null != sAtt)
-        	? (1000 * Integer.parseInt(sAtt)) 
-            : m_iDfltReloadMillis
-           );
+	  String sAtt	= GpListener.obtainAtt(m_oParms
+			  	,GpListener.PARM_ACTION_CLASS,null);
+	  m_oExecClass	= GpListener.checkActionClass(sAtt);
+	  
+	  sAtt			= GpListener.obtainAtt(m_oParms
+			  	,GpListener.PARM_MAX_THREADS,"1");
+	  int iMax		= Integer.parseInt(sAtt);
+	  m_iMaxThr		= Math.min(iMax,m_iUpperThreadLimit);
 
-    DomElement[] oaParms = m_oParms.getAllElemChildren();
+	  sAtt			= m_oParms.getAttr(PARM_POLL_LTCY);
+	  m_iPollMillis	= (null==sAtt) ? m_iDfltPollMillis
+	  			: 1000 * Integer.parseInt(sAtt);
+	  if (m_iPollMillis < m_iMinPollMillis)
+		  m_iPollMillis = m_iMinPollMillis;
+  } //________________________________
 
-    sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
-    long lPollLtcy = (null != sAtt) 
-    		? (1000 * Integer.parseInt(sAtt)) 
-    		: m_iDfltPollMillis;
-
-    if (lPollLtcy < m_iMinPollMillis)
-      lPollLtcy = m_iMinPollMillis;
-    
-    boolean bFirst = true;
-    while (System.currentTimeMillis() <= lNewLoad)
-    {
-      for (DomElement oCurr : oaParms)
+  /**
+   * Implementation of Observer interface
+   * <p/> Just count the number of active child threads
+   *  
+   */
+      public void update(Observable p_oObs, Object p_oUsrObj)
       {
-        oneScan(oCurr, bFirst);
-      }
-      long lSlack = lNewLoad - System.currentTimeMillis();
-      if (lSlack < 0)
-      {
-        break;
-      }
-      if (waitForQuiesce(Math.min(lSlack, lPollLtcy)))
-      { m_bEndRequested = true;
-        return;
-      }
-      bFirst = false;
-    }
-  } //_________________________________________
+        if (p_oUsrObj instanceof Integer)
+          m_iQthr += ((Integer) p_oUsrObj).intValue();
+      } //________________________________
 
-  protected String formatLogMsg(String p_s)
-  {	return new StringBuilder("Processor '")
-    	.append(getClass().getSimpleName()).append("'  <")
-    	.append(p_s).append(">")
-    	.toString();
-  } //__________________________________
-  
-  private final void quiesceTopicSubscribe() throws JMSException, NamingException
+  /**
+   * Implement run method for this Runnable
+   * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
+   * no more looping allowed for all child classes
+   * <p/> This condition will not prevent child processes to finish normally
+   */
+  public void run()
   {
-    try
-    {
-      m_oTopicConn	= null;
-      m_oSession	= null;
-      m_oTopic		= null;
-      m_oTopicSubs	= null;
-      
-      String sStopTopic	= m_oParms.getAttr(PARM_QUIESCE_TOPIC);
-      if (Util.isNullString(sStopTopic))
-    	  return;
-      String sFactClass	= m_oParms.getAttr(PARM_TOPIC_CONN_FACT);
-      if (Util.isNullString(sFactClass))
-    	  sFactClass = "ConnectionFactory";
+	  m_oThrGrp	= new ThreadGroup(m_oParms.getName());
+	  while (m_oDad.continueLooping())
+	  {
+		  List <Object> olPending = pollForCandidates();
+		  if (olPending.size() < 1)
+		  {	try {	Thread.sleep(m_iPollMillis); }
+		  	catch (InterruptedException e) { return; }
+		  	continue;
+		  }
 
-      String sJndiType = SystemProperties.getJndiServerType();
-      String sJndiURL = SystemProperties.getJndiServerURL();
-      Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
-      Object tmp = oJndiCtx.lookup(sFactClass);
-      TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+		  for (Object oCurr : olPending)
+		  {
+			  if (m_iQthr >= m_iMaxThr)
+			  {	m_oLogger.info("Waiting for available threads...(max="
+					  +m_iMaxThr+")");
+				try { Thread.sleep(m_iSleepForThreads); }
+				catch (InterruptedException e) {return; }
+				break;
+			  }
 
-      m_oTopicConn = tcf.createTopicConnection();
-      m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
-      m_oSession = m_oTopicConn.createTopicSession
-      		(false,TopicSession.AUTO_ACKNOWLEDGE);
-      m_oTopicConn.start();
+			  // give the derived class an opportunity to do something
+			  // before processing current object
+			  Object oProcess = null;
+			  try 
+			  { if (null==(oProcess = preProcess(oCurr)))
+					  continue;
+			  }
+			  catch (Exception ePre)
+			  {	m_oLogger.error("preProcess(Object) FAILED",ePre);
+			  	continue;
+			  }
 
-      String sSelector	= m_oParms.getAttr(PARM_QUIESCE_SELECTOR);
-      if (Util.isNullString(sSelector))
-    	  sSelector = "processor='"+getClass().getSimpleName()+"'";
-      m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true);
-    }
-    catch (Exception e)
-    { m_oLogger.error("Problems connecting to JMS. ",e);
-    }
-
-  } //_________________________________________
-
-  protected boolean waitForQuiesce(long p_lMillis) throws Exception
-  {
-    try
-    { boolean bSleep = (null== m_oTopicSubs);
-      Object oMsg = (bSleep) ? null : secureReceive(p_lMillis);
-    
-      if (null!=oMsg)
-        { m_oLogger.info("Starting Quiesce of "
-        		+getClass().getSimpleName());
-          return true;
-        }
-      if (bSleep)
-    	  Thread.sleep(p_lMillis);
-      return false;
-
-    }
-    catch (Exception e)
-    { m_oLogger.error("Problems with waitForQuiesce. ",e);
-      Thread.sleep(p_lMillis);
-      return false;
-    }
-  } //_________________________________________
-
-  private Message secureReceive(long p_lMillis) throws Exception
-  {
-    while (true)
-    try
-    { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); }
-    catch (JMSException e)
-    {
-      // put here your recovery code
-      return null;
-    }
-
-  } //_________________________________________
-
-
-  protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
-  {
-    String sPrcName = p_oP.getName();
-    if (!m_omChildPrc.containsKey(sPrcName))
-    {
-      ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
-      int iMax = m_oThrGrp.enumerate(oaCh);
-
-      ThreadGroup oThG = null;
-      for (int i1 = 0; null == oThG && i1 < iMax; i1++)
-      {	if (m_oThrGrp.getName().equals(sPrcName))
-          oThG = oaCh[i1];
-      }
-      if (null == oThG)
-        oThG = new ThreadGroup(sPrcName);
-      m_omChildPrc.put(sPrcName, newChildGroup(oThG));
-    }
-    GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName);
-
-    if (null == oChildGrp)				return;
-    if (p_bFirst)
-      oChildGrp.m_bError = false;
-
-    try
-    {
-      oChildGrp.execute(p_oP);
-    }
-    catch (Exception e)
-    {
-      oChildGrp.m_bError = true;
-      m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e);
-    }
-  } //_________________________________________
-
-  protected String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
-		throws Exception
-  {
-	  String sVal	= p_oP.getAttr(p_sAtt);
-	  if (null==sVal)
-		  sVal	= p_sDefault;
-	  if (Util.isNullString(sVal) && (null==p_sDefault))
-		  throw new Exception(formatLogMsg("Missing or invalid <"+p_sAtt+"> attribute"));
-	
-	  return sVal;
-  } //________________________________
-
-  protected abstract class GroupOfChilds implements Observer
-  {
-	protected abstract void doYourJob(DomElement p_oP) throws Exception;
-
-	public static final String PARM_MAX_THREADS = "maxThreads";
-	
-    protected ThreadGroup	m_oThrGrp;
-    protected boolean 		m_bError = false;
-
-    protected Class 		m_oExecClass;
-    protected DomElement 	m_oChParms;
-
-    protected int 			m_iQthr = 0, m_iMaxThr;
-    protected StringBuilder m_sb;
-    protected int 			m_iSbIni;
-
-    protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
-    {
-      m_oThrGrp = p_oThrGrp;
-      m_sb = new StringBuilder("GroupOfThreads ")
-          .append(m_oThrGrp.getName()).append(" : ");
-      m_iSbIni = m_sb.length();
-    } //________________________________
-
-    public void update(Observable p_oObs, Object p_oUsrObj)
-    {
-      if (p_oUsrObj instanceof Integer)
-      {
-        updQthreads( ( (Integer) p_oUsrObj).intValue());
-      }
-    } //________________________________
-
-    private synchronized void updQthreads(int p_i)
-    {
-      m_iQthr += p_i;
-    } //________________________________
-
-    private void execute(DomElement p_oP) throws Exception
-    {
-      m_sb.setLength(m_iSbIni);
-      if (m_bError)
-      {
-        m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
-                       .toString());
-        return;
-      }
-      checkParms(p_oP);
-      doYourJob	(p_oP);
-    } //________________________________
-
-    protected void setMaxThreads(DomElement p_oP,int p_iMax)
-    {
-      String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
-      m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
-      m_iMaxThr = (m_iMaxThr < 1) ? 1 
-      			: (m_iMaxThr > p_iMax) ? p_iMax
-      			: m_iMaxThr;
-
-    } //________________________________
-
-    protected Class checkActionClass(String p_sName) throws Exception
-    {
-  	Class oCls;
-  	try	{ oCls = Class.forName(p_sName); }
-  	catch (ClassNotFoundException e)
-  	{ throw new Exception(formatLogMsg("Class "+p_sName
-  			+" not found in classpath"));
-  	}
-  	try	{ oCls.getConstructor(new Class[] {DomElement.class}); }
-  	catch (NoSuchMethodException eN)
-  	{ throw new Exception(formatLogMsg("No appropriate constructor "
-  			+p_sName+"(DomElement) found for class "));
-  	}
-  	return oCls;
-    } //_________________________________________
-
-    // Very basic checkParms method
-    // Remember to call super.checkParms(p_oP) in derived checkParms() methods
-    // so your parameters will be cloned
-    // and to add REAL parameter checking
-	protected void checkParms(DomElement p_oP) throws Exception
-	{	
-    	m_sb.setLength(m_iSbIni);
-    	m_oChParms	= p_oP.cloneObj();
-    	m_oChParms.rmvChildsByName(AbstractProcessor.PARMS_THIS_INSTANCE);
-		setMaxThreads(p_oP,10);		
-	} //________________________________
-
-  } //______________________________________________________
-  
+			  AbstractAction oExec = null;
+			  try
+			  {	Constructor oConst = m_oExecClass
+			  		.getConstructor(GpListener.getActionClassArgs());
+			  	oExec = (AbstractAction)oConst
+			  		.newInstance(new Object[] {m_oParms,oProcess});
+			  }
+			  catch (Exception e)
+			  {	m_oLogger.error("Can't instantiate action class",e);
+				break;
+			  }
+			  // launch an instance of the AbstractAction in a child thread
+			  m_iQthr += 1;
+			  oExec.addObserver(this);
+			  new Thread(oExec).start();
+		  }
+	    }
+  } //__________________________________
+ 
 } //____________________________________________________________________________

Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,184 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-
-
-package org.jboss.soa.esb.listeners;
-
-import java.io.*;
-import java.lang.reflect.Constructor;
-import java.net.*;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.apache.log4j.Logger;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.processors.*;
-
-public class BetterDirListener extends AbstractPoller
-{
-  public static void main(String[] args) throws Exception
-  {
-    new BetterDirListener(args[0]);
-  } //________________________________
-
-  public BetterDirListener(String p_sParamsUid) throws Exception
-  {
-	super(p_sParamsUid);
-	m_iDfltReloadMillis	= 180000;
-	m_iDfltPollMillis	= 20000;
-	m_iMinPollMillis	= 5000;
-//	 See superclass - It provides ability to request end by subscribing to a Topic
-	runUntilEndRequested();
-  } //__________________________________
-
-  protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
-  {
-  	return new MyChildGroup(pThG);
-  } //_________________________________________
-
-  private class MyChildGroup extends AbstractPoller.GroupOfChilds
-  {
-    public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass";
-	public static final String PARM_INPUT_DIR 	= "inputDirURI";
-	public static final String PARM_SUFFIX 		= "inputSuffix";
-
-    private File 		m_oInpDir;
-    private FileFilter	m_oFFilt;
-
-    private MyChildGroup(ThreadGroup p_oThrGrp) throws Exception
-    {	super(p_oThrGrp);    	
-    } //________________________________
-
-	@Override
-	protected void doYourJob(DomElement p_oP) throws Exception
-	{
-		File[] oaF = m_oInpDir.listFiles(m_oFFilt);
-
-		for (File oFcurr : oaF)
-		{
-			if (m_iQthr >= m_iMaxThr)
-	        {
-	          m_oLogger.info(m_sb.append("Waiting for available threads").toString());
-	          Thread.sleep(5000);
-	          break;
-	        }
-	        m_oChParms.rmvChildsByName(AbstractProcessor.PARMS_THIS_INSTANCE);
-	        DomElement oThisProc = new DomElement(AbstractProcessor.PARMS_THIS_INSTANCE);
-	        oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName());
-	        m_oChParms.addElemChild(oThisProc);
-
-	        new Thread(m_oThrGrp,
-	                   new FileChildProcess(m_oExecClass, this, m_oChParms)).start();
-	        Thread.sleep(500);
-	      }
-	} //________________________________
-
-	protected void checkParms(DomElement p_oP) throws Exception
-    { 
-	  super.checkParms(p_oP);
-
-      String sAtt = m_oChParms.getAttr(PARM_INPUT_DIR);
-      if (null == sAtt)
-      {	throw new Exception(formatLogMsg(
-        		m_sb.append("Missing ").append(PARM_INPUT_DIR)
-                            .append(" attribute in -parameters ")
-                            .toString()));
-      }
-      m_oInpDir = new File(new URI(sAtt));
-      if (!m_oInpDir.isDirectory())
-      {	throw new Exception(formatLogMsg(
-        		m_sb.append(sAtt).append(" is not a directory").toString()));
-      }
-      if (!m_oInpDir.canRead())
-      {	throw new Exception(formatLogMsg(
-    		  m_sb.append("Can't read directory ").append(sAtt).
-                            toString()));
-      }
-
-      sAtt = m_oChParms.getAttr(PARM_SUFFIX);
-      if (null == sAtt)
-      {	throw new Exception(formatLogMsg(
-        		m_sb.append("Missing ").append(PARM_SUFFIX)
-                	.append(" attribute in -parameters ")
-                    .toString()));
-      }
-
-      m_oFFilt = new FileEndsWith(sAtt);
-
-      sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS);
-      m_oExecClass = null;
-      if (null == sAtt)
-      { throw new Exception(formatLogMsg(
-    		  m_sb.append("Missing fileProcessorClass attribute").
-    		  		toString()));
-      }
-      m_oExecClass = super.checkActionClass(sAtt);
-
-    } //________________________________
-
-
-    private class FileEndsWith implements FileFilter
-    {
-      String m_sSuffix;
-      FileEndsWith(String p_sEnd) throws Exception
-      {
-        m_sSuffix = p_sEnd;
-        if (Util.isNullString(m_sSuffix))
-          throw new Exception("Must specify file extension");
-      } //_________________________________________
-
-      public boolean accept(File p_f)
-      {	return (p_f.isFile())
-        	? p_f.toString().endsWith(m_sSuffix)
-        	: false;
-      } //_________________________________________
-    } //___________________________________________________
-  } //______________________________________________________
-
-  protected static class FileChildProcess extends Observable implements Runnable
-  { private Class       m_oExecClass;
-    private DomElement  m_oParms;
-    private Logger      m_oLogger;
-    public FileChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP)
-    { m_oLogger = Util.getDefaultLogger(this.getClass());
-      m_oExecClass  = p_oExec;
-      this.addObserver(p_oObs);
-      m_oParms  = p_oP;
-      setChanged();
-      notifyObservers(new Integer(1));
-    } //__________________________________
-
-    public void run()
-    { try
-      { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class});
-        Object oProc = oCnst.newInstance(new Object[] {m_oParms});
-        ((FileProcessor)oProc).execute();
-      }
-      catch (Exception e) { m_oLogger.error("run() FAILED",e); }
-
-      setChanged();
-      notifyObservers(new Integer(-1));
-    } //__________________________________
-  } //______________________________________________________
-
-} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,175 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import org.jboss.soa.esb.util.*;
+import org.jboss.soa.esb.actions.AbstractFileAction;
+import org.jboss.soa.esb.helpers.*;
+
+public class DirectoryPoller extends AbstractPoller
+{
+  public static final String FILE_INPUT_DIR 	= "inputDirURI";
+  public static final String FILE_INPUT_SFX 	= "inputSuffix";
+  public static final String FILE_WORK_SFX 		= "workSuffix";
+  public static final String FILE_ERROR_DIR   	= "errorDirURI";
+  public static final String FILE_ERROR_SFX   	= "errorSuffix";
+  public static final String FILE_POST_DIR  	= "postDirURI";
+  public static final String FILE_POST_SFX  	= "postSuffix";
+  public static final String FILE_POST_DEL  	= "postDelete";
+
+  public DirectoryPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+  {
+	super(p_oDad,p_oParms);
+	checkMyParms();
+  } //__________________________________
+
+
+    protected File 			m_oInpDir	,m_oErrorDir	,m_oPostDir;
+    protected FileFilter	m_oFFilt;
+    protected String 		m_sInpSfx	,m_sWrkSfx		,m_sErrSfx	,m_sPostSfx;
+    protected boolean		m_bPostDel;
+
+    /**
+     * 
+     * @param p_o Object - Must be a File representing the file that has to be processed
+     * @return Object - an array of 3 Files containing:
+     * <p/>[0] renamed file (workSuffix appended to input file name)
+     * <p/>[1] target file name in case actionClass is unable to complete successfuly
+     * <p/>[2] target file name in case actionClass finishes successfuly
+     */
+	@Override
+	public Object preProcess(Object p_o) throws Exception 
+	{
+		if (!(p_o instanceof File))
+			return null;
+		File oF = (File)p_o;
+		File oNameWrk = new File (oF.getParentFile(),oF.getName()+m_sWrkSfx);
+
+
+		if (! oF.renameTo(oNameWrk))
+			return null;
+		AbstractFileAction.Params oCurr = new AbstractFileAction.Params();
+		oCurr.bPostDelete	= m_bPostDel;
+		oCurr.oInpF			= oF;
+		oCurr.oWrkF			= oNameWrk;
+		oCurr.oErrF			= new File (m_oErrorDir	,oF.getName()+m_sErrSfx);
+		oCurr.oDoneF		= new File (m_oPostDir	,oF.getName()+m_sPostSfx);
+
+		return oCurr;
+	} //________________________________
+
+	@Override
+	protected List<Object> pollForCandidates()
+	{
+		File[] oaF = m_oInpDir.listFiles(m_oFFilt);
+		return Arrays.asList((Object[])oaF);
+	} //________________________________
+
+	protected void checkMyParms() throws Exception
+    { 
+	//  INPUT directory and suffix  (used for FileFilter)
+	  String sInpDir = GpListener.obtainAtt(m_oParms,FILE_INPUT_DIR,null);
+      m_oInpDir = new File(new URI(sInpDir));
+      seeIfOkToWorkOnDir(m_oInpDir);
+
+      m_sInpSfx  = GpListener.obtainAtt(m_oParms,FILE_INPUT_SFX,null);
+      m_sInpSfx  = m_sInpSfx.trim();
+      if (m_sInpSfx.length()<1)
+    	  throw new Exception ("Invalid "+FILE_INPUT_SFX+" attribute");
+	  m_oFFilt = new FileEndsWith(m_sInpSfx);
+
+	//  WORK suffix (will rename in input directory)
+      m_sWrkSfx	= GpListener.obtainAtt(m_oParms,FILE_WORK_SFX,".esbWork").trim();
+      if (m_sWrkSfx.length()<1)
+    	  throw new Exception ("Invalid "+FILE_WORK_SFX+" attribute");
+      if (m_sInpSfx.equals(m_sWrkSfx))
+    	  throw new Exception("Work suffix must differ from input suffix <"+m_sWrkSfx+">");
+
+    //    ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
+      String sErrDir = GpListener.obtainAtt(m_oParms,FILE_ERROR_DIR,sInpDir);
+      m_oErrorDir = new File(new URI(sErrDir));
+      seeIfOkToWorkOnDir(m_oErrorDir);
+
+      m_sErrSfx  = GpListener.obtainAtt(m_oParms,FILE_ERROR_SFX,".esbError").trim();
+      if (m_sErrSfx.length()<1)
+    	  throw new Exception ("Invalid "+FILE_ERROR_SFX+" attribute");
+      if (m_oErrorDir.equals(m_oInpDir) && m_sInpSfx.equals(m_sErrSfx))
+    	  throw new Exception("Error suffix must differ from input suffix <"+m_sErrSfx+">");
+
+
+   //    Do users wish to delete files that were processed OK ?
+      String sPostDel = GpListener.obtainAtt(m_oParms,FILE_POST_DEL,"false").trim();
+      m_bPostDel = Boolean.parseBoolean(sPostDel);
+      if (m_bPostDel)
+    	  return;
+
+    //    POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
+      String sPostDir = GpListener.obtainAtt(m_oParms,FILE_POST_DIR,sInpDir);
+      m_oPostDir = new File(new URI(sPostDir));
+      seeIfOkToWorkOnDir(m_oPostDir);
+      m_sPostSfx  = GpListener.obtainAtt(m_oParms,FILE_POST_SFX,".esbDone").trim();
+      if (m_oPostDir.equals(m_oInpDir))
+      {	if (m_sPostSfx.length()<1)
+    	  throw new Exception ("Invalid "+FILE_POST_SFX+" attribute");
+      	if (m_sPostSfx.equals(m_sInpSfx))
+    	  throw new Exception("Post process suffix must differ from input suffix <"+m_sPostSfx+">");
+      }
+
+    } //________________________________
+	
+	protected void seeIfOkToWorkOnDir (File p_oDir) throws Exception
+	{
+      if (! p_oDir.exists())   
+    	  throw new Exception ("Directory "+p_oDir.toString()+" not found");
+      if (!p_oDir.isDirectory())
+    	  throw new Exception(p_oDir.toString()+" is not a directory");
+      if (!p_oDir.canRead())
+    	  throw new Exception("Can't read directory "+p_oDir.toString());
+      if (! p_oDir.canWrite()) 
+    	  throw new Exception ("Can't write/rename in directory "+p_oDir.toString());
+	} //________________________________
+
+
+    private class FileEndsWith implements FileFilter
+    {
+      String m_sSuffix;
+      FileEndsWith(String p_sEnd) throws Exception
+      {
+        m_sSuffix = p_sEnd;
+        if (Util.isNullString(m_sSuffix))
+          throw new Exception("Must specify file extension");
+      } //______________________________
+
+      public boolean accept(File p_f)
+      {	return (p_f.isFile())
+        	? p_f.toString().endsWith(m_sSuffix)
+        	: false;
+      } //______________________________
+    } //____________________________________________________
+
+} //____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,617 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.lang.reflect.*;
+
+import javax.jms.*;
+import javax.naming.*;
+
+import org.apache.log4j.*;
+
+import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.common.SystemProperties;
+import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.notification.NotificationList;
+import org.jboss.soa.esb.parameters.*;
+import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * Controlling class that will launch listener child threads 
+ * for supported transport listener classes, as indicated
+ * in the configuration XML tree pointed by arg[0]
+ * 
+ * <p /> Can be launched as uppermost controller (it has a main(args) method)
+ * <p /> Also implements Runnable, and can thus be launched in a child 
+ * thread from an upper controlling process
+ * <p /> Listens on a JMS queue (with an optional message selector) 
+ * for commands (e.g. Quiesce, Reload Parameters, Set End Time, etc.)
+ * <p /> Parameter reloading can also be set using the PARM_RELOAD_SECS 
+ * attribute 
+ * <p /> End time for this instance can also be set using the PARM_END_TIME 
+ * attribute  
+ * <p />
+ * 
+ * @author Esteban
+ *
+ */
+public class GpListener implements Runnable
+{
+	public static void main(String[] args) throws Exception
+	{
+		GpListener oProc = new GpListener(args[0]);
+		oProc.run();
+		GpListener.State oS = oProc.getState();
+
+		if (null!=oS.getException())
+		{
+			oProc.m_oLogger.error
+			  ("GpListener <"+args[0]+"> FAILED\n",oS.getException());
+		}
+		System.exit(oS.getCompletionCode());
+	} //________________________________
+	
+	protected int	m_iDfltReloadMillis= 180000 // default interval between parameter reloads
+	;
+	public static final String COMMAND_CONN_FACTORY	= "commandConnFactoryClass";
+	public static final String COMMAND_JNDI_TYPE	= "commandJndiType";
+	public static final String COMMAND_JNDI_URL 	= "commandJndiURL";
+	public static final String COMMAND_IS_TOPIC		= "commandIsTopic";
+	public static final String COMMAND_JNDI_NAME	= "commandJndiName";
+	public static final String COMMAND_MSG_SELECTOR	= "messageSelector";
+	public static final String PARM_RELOAD_SECS		= "parameterReloadSecs";
+	public static final String PARM_END_TIME    	= "endTime";
+	
+	//  Attribute name that denotes listener class to be instantiated in a child thread
+	//  This attribute is not in the root node but in first level child DomElements
+	public static final String PARM_LISTENER_CLASS	= "listenerClass";
+	
+    public static final String PARM_ACTION_CLASS	= "actionClass";
+    public static final String PARM_MAX_THREADS		= "maxThreads";
+
+
+	private Logger	m_oLogger;
+
+	private ParamRepository m_oParmRepos;
+	private String			m_sParmsName;
+	private DomElement		m_oParms;
+
+	private HashMap<String,Object>	m_oAtts;
+	/**
+	 * Obtain a shallow copy of needed atributes in this object's last loaded
+	 * parameter tree
+	 * <p/>The local bject is cloned so child threads can use it as they choose to 
+	 * without interfering with the environment 
+	 * <p /> Listener processes controlled by this object should keep a reference
+	 * to this object at construction time, and not call this method again unless
+	 * they specifically need updated values.  Parameter reload could have happened
+	 * since last call
+	 *
+	 * @return Map - a shallow copy of the attributes Map
+	 */
+	@SuppressWarnings("unchecked")
+	public	Map<String,Object> getControllerAttributes()
+		{ return (Map<String,Object>)m_oAtts.clone(); }
+	
+	private boolean	m_bReloadRequested ,m_bEndRequested;
+	private long	m_lNextReload	= Long.MAX_VALUE;
+	private long	m_lEndTime		= Long.MAX_VALUE;
+	public static final SimpleDateFormat s_oDateParse 
+		= new SimpleDateFormat("yyyyMMdd hh:mm:ss");
+
+	private State	m_oState	= null;
+	public  State	getState()	{return m_oState; }
+	public static enum State
+	{	
+		Loading_parameters
+		,Running
+		,Shutting_down
+		,Done_OK
+		,Exception_thrown
+		;
+		int				m_iCompletionCode 	= 0;
+		Exception		m_oException		= null;
+		public int 		 getCompletionCode() 	{ return m_iCompletionCode; };
+		public Exception getException()			{ return m_oException; }
+	};
+	
+    private MessageConsumer m_oCmdSrc;
+    private Session			m_oJmsSess;
+    private Connection		m_oJmsConn;
+    
+	public GpListener(String p_sParameterName) throws Exception
+	{	m_oLogger	= Logger.getLogger(this.getClass());
+		m_sParmsName = p_sParameterName;
+	    m_oParmRepos = ParamRepositoryFactory.getInstance();
+
+		m_oState	= State.Loading_parameters;
+		try 
+		{   
+			String sXml = m_oParmRepos.get(m_sParmsName);
+			m_oParms	= DomElement.fromXml(sXml); 
+			checkParms(m_oParms); 
+		}
+		catch (Exception e)
+		{	m_oState = State.Exception_thrown;
+			m_oState.m_oException	= e;
+			m_oLogger.fatal("Problems with initial parameter load - <"
+				+p_sParameterName+">",e);
+			throw e;
+		}
+	} //________________________________
+	/**
+	 * Check to see if all needed parameters are there, and assign default
+	 * values to some of them
+	 * 
+	 * @param p_oP  DomElement - Where to look for the mandatory/optional 
+	 * 							configuration attributes
+	 * @throws Exception  -  If attributes are wrong or not enough for a proper
+	 * runtime configuration
+	 */
+	public void checkParms(DomElement p_oP) throws Exception
+	{
+		// We've just loaded - set to false until next reload requested
+		m_bReloadRequested = false;
+		m_oCmdSrc	= null;
+		
+		Map<String,Object> oNewAtts = new HashMap<String,Object>();
+
+		// Only check for JMS attributes if a queue JNDI name was specified
+		String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+		if (! Util.isNullString(sJndiName))
+		{
+			oNewAtts.put(COMMAND_JNDI_NAME,sJndiName);
+
+			String sJndiType = obtainAtt(p_oP,COMMAND_JNDI_TYPE,"jboss");
+			oNewAtts.put(COMMAND_JNDI_TYPE,sJndiType);
+			String sJndiURL = obtainAtt(p_oP,COMMAND_JNDI_URL,"localhost");
+			oNewAtts.put(COMMAND_JNDI_URL,sJndiURL);	
+			Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+			
+			String sFactClass = obtainAtt(p_oP,COMMAND_CONN_FACTORY,"ConnectionFactory");
+			oNewAtts.put(COMMAND_CONN_FACTORY,sFactClass);
+			if (Util.isNullString(sFactClass))
+	    	  sFactClass = "ConnectionFactory";
+			Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+			String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+			if (null!=sMsgSelector)
+				oNewAtts.put(COMMAND_MSG_SELECTOR,sMsgSelector);
+
+			boolean bIsTopic = Boolean.parseBoolean
+				(obtainAtt(p_oP,COMMAND_IS_TOPIC,"false"));	
+			if (bIsTopic)
+			{
+				TopicConnectionFactory tcf = (TopicConnectionFactory)oFactCls;
+				TopicConnection oTC = tcf.createTopicConnection();
+				Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+				TopicSession oSess = oTC.createTopicSession
+	      			(false,TopicSession.AUTO_ACKNOWLEDGE);
+				m_oJmsConn = oTC;
+				m_oJmsSess = oSess;
+				oTC.start();
+				m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector,true);
+			}
+			else
+			{
+				QueueConnectionFactory qcf = (QueueConnectionFactory)oFactCls;
+				QueueConnection oQC = qcf.createQueueConnection();
+				javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx.lookup(sJndiName);
+				QueueSession oSess = oQC.createQueueSession
+	      			(false,TopicSession.AUTO_ACKNOWLEDGE);
+				oQC.start();
+				m_oJmsConn = oQC;
+				m_oJmsSess = oSess;
+				m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+			}
+		}
+
+		// if PARM_RELOAD_SECS not set, and no command queue
+		// then reload every 10 minutes
+		// If there is a command queue, run until command is received
+		String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
+		m_lNextReload = (null!=sRldSecs)
+			? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
+			: (null==m_oCmdSrc)	? Long.MAX_VALUE
+			: System.currentTimeMillis() + m_iDfltReloadMillis
+		;
+
+		// if PARM_END_TIME not set try to run forever
+		// not a good practice if command queue is not set
+		// Expected date format is "yyyyMMdd hh:mm:ss" 
+        String sEndT = p_oP.getAttr(PARM_END_TIME);
+		m_lEndTime	= (null==sEndT) ? Long.MAX_VALUE
+				: s_oDateParse.parse(sEndT).getTime();		
+
+	} //________________________________
+
+/**
+ * Main execution loop
+ * <p/> Will continue to run until either 
+ * <p/>a) run time is expired
+ * <p/>b) quiesce command is received in command queue 
+ * <p/>For every child element that contains a PARM_LISTENER_CLASS 
+ * attribute, this method will try to launch a child thread
+ * instantiating an object of that class, and will call it's run()
+ * method
+ * <p/>Once all child processes are trigered, the main thread
+ * will either
+ * <p/>1) wait for a message in the command queue (if one was
+ * configured)  until next reload or end of run period expired
+ * <p/>or 2) Just sleep if there's no command queue to listen on 
+ */
+	public void run() 
+	{
+		while (endNotRequested())
+		{
+			m_oState	= State.Running;
+			for (DomElement oCurr : m_oParms.getAllElemChildren())
+			{	String sClass = oCurr.getAttr(PARM_LISTENER_CLASS);
+				if (Util.isNullString(sClass))
+					continue;
+				tryToLaunchChildListener(oCurr,sClass);
+			}
+
+			waitForCmdOrSleep();
+			
+			if (endRequested())
+				break;
+			if (timeToReload())
+			try
+			{
+				m_oState	= State.Loading_parameters;
+				m_oLogger.info("Reloading parameters _____________________________________________________");
+				DomElement oNew	= DomElement.fromXml(m_oParmRepos.get(m_sParmsName)); 
+				checkParms(oNew);
+				m_oParms	= oNew;
+			}
+			catch (Exception e)
+			{
+				m_oLogger.error
+						("Failed to reload parameters"
+						+" - Continuing with cached version",e);
+			}
+		}
+//		m_oState	= State.Shutting_down;
+		
+		m_oState	= State.Done_OK;
+		m_oState.m_iCompletionCode = 0;
+		m_oLogger.info("Finishing_____________________________________________________");
+		
+		if (null!=m_oJmsSess)
+			try { m_oJmsSess.close(); }
+			catch (JMSException eS) {/* Tried my best - Just continue */}
+		if (null!=m_oJmsConn)
+			try { m_oJmsConn.close(); }
+			catch (JMSException eC) {/* Tried my best - Just continue */}
+	} //________________________________
+	
+	private void tryToLaunchChildListener(DomElement p_oP,String p_sClassName)
+	{
+		try
+		{	Class oListener		= Class.forName(p_sClassName);
+			Constructor oConst	= oListener.getConstructor
+				(new Class[] {this.getClass(),DomElement.class});
+			Runnable oRun = (Runnable)oConst.newInstance(new Object[] {this,p_oP});
+			new Thread(oRun).start();
+		}
+		catch (Exception e)
+		{
+			m_oLogger.error("Cannot launch <"+p_sClassName+">\n",e);
+		}
+	} //________________________________
+
+	long millisToWait()
+	{
+		return Math.min(m_lNextReload,m_lEndTime)
+					- System.currentTimeMillis();
+	} //________________________________
+
+	private void waitForCmdOrSleep()
+	{
+		long lToGo = millisToWait(); 
+
+		if (null==m_oCmdSrc)
+		{
+			m_oLogger.debug("About to sleep "+lToGo);
+			// No command queue nor topic - Just sleep until time
+			// exhausted, or thread interrupted
+			try { Thread.sleep(lToGo); }
+			catch (InterruptedException e)
+			{
+				m_lEndTime = 0;	// mark as end requested and return
+			}
+			return;
+		}
+		
+		// Wait for commands until time exhausted or command received
+		// Note that received commands might change time variables (reload/end)
+		// that's why time to go is recalculated on each cycle
+		while ((lToGo = millisToWait()) > 0)
+		{
+			try
+			{	
+				m_oLogger.info("Waiting for command ... timeout="+lToGo+" millis");
+				// for the time being, only text messages allowed
+				// THIS WILL CHANGE !!
+				Message oM  = m_oCmdSrc.receive(lToGo);
+				if (null==oM)
+					return;
+				if (! (oM instanceof TextMessage))
+				{
+					m_oLogger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+					return;
+				}
+				processCommand((TextMessage)oM);
+				if (endRequested() || timeToReload())
+					break;
+			}
+			catch (JMSException eJ)
+			{
+				m_oLogger.info("receive on command queue failed",eJ);
+			}
+		}
+	} //________________________________
+
+/**
+ * Processes the command that has been received in the command queue (or topic)
+ * <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be changed
+ *
+ * <p/>
+ * <p/><TABLE border="1">
+ *	<COLGROUP> <COL width="200"/> <COL width="400"/> </COLGROUP>
+ *	<TR> <TD align="center">message text</TD><TD align="center">effect</TD> </TR>
+ *	<TR>
+ *		<TD>shutdown*</TD>
+ *		<TD>End time will be immediately set to 'now' - quiesce process will start - Child threads will be allowed to finish normally</TD>
+ *	</TR>
+ *	<TR>
+ *		<TD>reload param*</TD>
+ *		<TD>Parameters will be immediately reloaded, and listener reconfigured with new values</TD>
+ *	</TR>
+ *	<TR>
+ *		<TD>endTime  yyyyMMdd hh:mm:ss</TD>
+ *		<TD>End time will be set to new value.
+ *			If hh:mm:ss is not supplied => end of day assumed (23:59:59)</TD>
+ *	</TR>
+ *</TABLE> * startsWith()
+ * <p/>
+ * @param p_oMsg TextMessage - Received in command queue/topic
+ *  
+ */
+	private void processCommand (TextMessage p_oMsg)
+	{
+		try
+		{
+			String sTxt = p_oMsg.getText();
+			if (null==sTxt)
+				return;
+			String sLow = sTxt.trim().toLowerCase();
+			if (sLow.startsWith("shutdown"))
+			{	m_bEndRequested = true;
+				m_oLogger.info("Shutdown has been requested");
+				return;
+			}
+			if (sLow.startsWith("reload param"))
+			{	m_bReloadRequested = true;
+				m_oLogger.info("Request for parameter reload has been received");
+				return;
+			}
+			String[] sa = sLow.split("\\s+");
+			if (sa.length>1 && "endtime".equals(sa[0]))
+				try
+				{	String sDate = sa[1];
+					String sTime =
+						(sa.length<3 || null==sa[2]) ? "23:59:59" : sa[2];
+					Date oEnd = s_oDateParse.parse(sDate+" "+sTime);
+					m_oLogger.info("New end date set to : "+oEnd);
+					m_lEndTime = oEnd.getTime();
+				}
+				catch (Exception eDat)
+				{
+					m_oLogger.info("Problems with endTime command",eDat);
+				}
+		}
+		catch (JMSException eJ)
+		{
+			m_oLogger.info("Problems with command queue",eJ);
+		}
+	} //________________________________
+
+/**
+ * Accessor to determine if execution time is expired or shutdown requested
+ * @return boolean if processing has to stop (all child threads will be allowed to finish)
+ */
+	public boolean endRequested() 
+	{	return m_bEndRequested 
+			|| System.currentTimeMillis() >= m_lEndTime; 
+	}
+/**
+ * Accessor to determine if execution time is not expired, and no shutdown request received
+ * @return boolean - true if run time has not expired and quiesce
+ * has not been requested
+ */
+	public boolean endNotRequested() 
+	{	return ! endRequested();
+	}
+
+/**
+ * Provide a common accessor to determine if parameters have to be reloaded
+ * <p/> For child threads this means thread execution has to end
+ * </p> Child processes should only call this method when they are idle
+ * (as opposed to in the middle of executing a unit of work)
+ * @return boolean - true if it's time to reload parameters
+ */
+	public boolean timeToReload() 
+	{	return m_bReloadRequested 
+				|| System.currentTimeMillis() >= m_lNextReload; 
+	}
+	
+/**
+ * Helper accessor for child processes that provides info to determine if 
+ * they can continue with yet another execution cycle
+ * @return boolean - true if runtime is not expired and not time yet to reload
+ * parameters
+ */	
+	public boolean continueLooping()
+	{	
+		return (endNotRequested() && ! timeToReload());
+	} //________________________________
+
+/**
+ * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+ * 
+ * @param p_oP 		DomElement - look for attributes in this Element only
+ * @param p_sAtt 	String - Name of attribute to find
+ * @param p_sDefault String -default value if requested attribute is not there 
+ * @return 			String - value of attribute, or default value (if null) 
+ * @throws Exception - If requested attribute not found and no default value
+ * supplied by invoker
+ */
+    static String obtainAtt
+    	(DomElement p_oP, String p_sAtt, String p_sDefault)
+	throws Exception
+	{
+	  String sVal	= p_oP.getAttr(p_sAtt);
+	  if ((null==sVal) && (null==p_sDefault))
+		  throw new Exception("Missing or invalid <"+p_sAtt+"> attribute");
+	
+	  return (null!=sVal) ? sVal : p_sDefault;
+	} //________________________________
+	
+    private static Class[] s_oaActionConstr = {DomElement.class, Object.class};
+    public static Class[] getActionClassArgs() { return s_oaActionConstr; }
+
+    /**
+     * Check to see if an object of the class (arg 0) can be instantiated
+     * in this context
+     * 
+     * @param p_sName String - class name to instantiate 
+     * - Must implement org.jboss.soa.esb.listeners.AbstractActionClass
+     * @return Class - 
+     * @throws Exception - if class not found in path or no appropriate constructor
+     */
+     protected static Class checkActionClass(String p_sName) throws Exception
+     {
+	   	Class oCls;
+	   	try	
+	   	{	oCls = Class.forName(p_sName);
+	   	}
+	   	catch (ClassNotFoundException e)
+	   	{ throw new Exception("Class "+p_sName
+	   			+" not found in classpath");
+	   	}
+	
+	   	try	
+	   	{	oCls.getConstructor(s_oaActionConstr);
+	   	}
+	   	catch (NoSuchMethodException eN)
+	   	{ throw new Exception("No appropriate constructor "
+	   			+p_sName+"(DomElement,Object) found for class ");
+	   	}
+	   	try { oCls.asSubclass(AbstractAction.class); }
+	   	catch (ClassCastException eCC)
+	   		{	throw new Exception("class "+p_sName
+	   				+ " does not extend "+AbstractAction.class.getName());
+	   		}
+	   	return oCls;
+     } //_________________________________________
+
+/**
+ * Find child nodes named "NotificationList" that contain an attribute 'type' that
+ * starts with "ok" (case insensitive)
+ * @param p_oP - DomElement to search for "NotificationList" child Elements
+ * @param p_oSer Serializable - Will constitute the body of the notification
+ */
+ 	public static void notifyOK(DomElement p_oP, Serializable p_oSer)
+	{ try
+	  { 
+		Serializable oNotif = p_oSer;
+		for (DomElement oCurr : p_oP.getElementChildren(NotificationList.ELEMENT))
+		{ NotificationList oNL = new NotificationList(oCurr);
+		  if (! oNL.isOK())    continue;
+	        getNotifHandler().sendNotifications(oCurr,oNotif);
+		}
+	   }
+	   catch (Exception e) {}
+	} //__________________________________
+
+ 	/**
+ 	 * Find child nodes named "NotificationList" that contain an attribute 'type' that
+ 	 * starts with "err" (case insensitive) or no 'type' attribute set
+ 	 * @param p_oP - DomElement to search for "NotificationList" child Elements
+ 	 * @param p_e  - Exception if not null, will be appended to the body
+ 	 * @param p_oSer Serializable - Will be included at the beginning of the body 
+ 	 * of the notification
+ 	 */
+	public static void notifyError(DomElement p_oP,Exception p_e, Serializable p_oSer)
+	{ 
+		Serializable oNotif = p_oSer;
+		ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+	    PrintStream oPS = new PrintStream(oBO);
+	    try
+	    { oPS.println(oNotif.toString());
+	      if (null != p_e) p_e.printStackTrace(oPS);
+	      oPS.close();
+
+	      String sMsg = oBO.toString();
+	      for (DomElement oCurr : p_oP.getElementChildren(NotificationList.ELEMENT))
+	      { NotificationList oNL = new NotificationList(oCurr);
+	        if (! oNL.isErr())    continue;
+	        getNotifHandler().sendNotifications(oNL,sMsg);
+	      }
+	    }
+	    catch (Exception e) { }
+	} //________________________________
+	  
+	  private static InotificationHandler s_oNH;
+	  private static final Object s_oSync = new Integer(0); 
+/**
+ * Lazy instantiator of a InotificationHandler
+ * @return - a reference to an implementation of the interface or null if it
+ *   can't be instantiated
+ */
+	  protected static InotificationHandler getNotifHandler()
+	  {
+		if (null!=s_oNH)
+			return s_oNH;
+		synchronized (s_oSync)
+		{	if (null==s_oNH)
+			try {	s_oNH = NotificationHandlerFactory.getNotifHandler
+			  			("remote"
+			  			,SystemProperties.getJndiServerType()
+			  			,SystemProperties.getJndiServerURL()
+			  			);
+				}
+			catch (Exception e)
+				{	Logger.getLogger(GpListener.class).error("Notification FAILED",e);
+				}
+		}
+		return s_oNH;
+	  } //______________________________
+	  
+} //____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,9 +1,28 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
 package org.jboss.soa.esb.listeners;
 
-import java.io.*;
 import java.lang.reflect.*;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Observer;
 import java.util.Observable;
 
@@ -12,512 +31,171 @@
 import javax.naming.*;
 import javax.jms.*;
 
-import org.jboss.soa.esb.services.*;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.actions.AbstractAction;
 import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.notification.*;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.processors.*;
 
-public class JmsQueueListener
+public class JmsQueueListener implements Runnable, Observer
 {  
-  // You can override this value at constructor time of your
-  // derived class after calling super(String)
-  protected int	m_iDfltReloadMillis= 180000 // default interval between
-  											 // parameter reloading
+  // You can override these values at constructor time of your derived class
+  protected  int
+  			m_iSleepForThreads	= 3000	// default sleep if no threads available
+			,m_iUpperThreadLimit = 10	// just in case - override if you wish 
   ;
-  
-  //attributes that need to be set in the xml parameter file for the service
-  public static final String PARM_ACTION_CLASS		= "actionClass";
-
-  public static final String PARM_RELOAD_LTCY 		= "parmsReloadSecs";
-
-  public static final String PARM_TOPIC_CONN_FACT	= "topicConnFactoryClass";
-  public static final String PARM_QUIESCE_TOPIC		= "quiesceTopic";
-  public static final String PARM_QUIESCE_SELECTOR	= "quiesceSelector";
-
   public static final String LISTEN_QUEUE_CONN_FACT	= "queueConnFactoryClass";
+  public static final String LISTEN_JNDI_TYPE		= "listenJndiType";
+  public static final String LISTEN_JNDI_URL 		= "listenJndiURL";
   public static final String LISTEN_QUEUE			= "listenQueue";
   public static final String LISTEN_MSG_SELECTOR	= "listenMsgSelector";
 
-  protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
-  protected Map<String,GroupOfChilds> m_omChildPrc
-  		= new HashMap<String,GroupOfChilds>();
+  protected boolean 		m_bError = false;
+
+  protected QueueConnection m_oQconn;
+  protected QueueSession	m_oQsess;
+  protected Queue			m_oQueue;
+  protected String			m_sSelector;
+  protected MessageConsumer m_oRdr;
+
+
+  protected int 		m_iQthr = 0, m_iMaxThr;
+
+  protected ThreadGroup m_oThrGrp = null;
 	
-  protected ParamRepository m_oParmRepos;
-  protected String	 	m_oParmName;
   protected Logger		m_oLogger;
 
+  protected GpListener	m_oDad;
   protected DomElement	m_oParms;
-  protected boolean		m_bEndRequested;
-	
-  protected TopicConnection m_oTopicConn;
-  protected TopicSession 	m_oSession;
-  protected Topic 			m_oTopic;
-  protected TopicSubscriber m_oTopicSubs;
-  
-  protected long			m_lNextReload;
-  
-  
+  protected Class 		m_oExecClass;
 
-  public JmsQueueListener(String p_sParamsUid) throws Exception
+  public JmsQueueListener(GpListener p_oDad, DomElement p_oParms) throws Exception
   {
-    m_oLogger = Util.getDefaultLogger(this.getClass());
-
-    m_oParmRepos = ParamRepositoryFactory.getInstance();
-    m_oParmName = p_sParamsUid;
-    runUntilEndRequested();
+	    m_oLogger	= Logger.getLogger(this.getClass());
+	    m_oDad		= p_oDad;
+	    m_oParms	= p_oParms.cloneObj();
+	    checkMyParms();
+	    m_oThrGrp	= new ThreadGroup(m_oParms.getName());
   } //__________________________________
   
-  protected void runUntilEndRequested() throws Exception
-  {	while (! m_bEndRequested)
-  	{ try 
-  	  {	String sMsg = (null == m_oParms) 
-  				? "Initial Parameter loading" : "Reloading Params";
-  	    m_oLogger.debug(formatLogMsg(sMsg));
-  		m_oParms	= DomElement.fromXml(m_oParmRepos.get(m_oParmName)); 
-  	  }
-  	  catch (Exception e)
-  	  {	
-  	    StringBuilder sb = new StringBuilder ("Problems loading params ")
-  			.append(m_oParmName)
-  			.append((null==m_oParms)? " exiting..." : "continuing to use cached params") 
-  			;
-  	  	m_oLogger.error(formatLogMsg(sb.toString()));
-  	  	if (null==m_oParms)
-  	  		throw e;
-  	  }
-  	  quiesceTopicSubscribe();
-	  executeOneCycle();
-  	}
-  } //__________________________________
-  
-  private void executeOneCycle() throws Exception
-  {
-    String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
-    m_lNextReload = System.currentTimeMillis()
-        + ( (null != sAtt)
-        	? (1000 * Integer.parseInt(sAtt)) 
-            : m_iDfltReloadMillis
-           );
+  /**
+   * Check for mandatory and optional attributes in parameter tree
+   * 
+   * @throws Exception - if mandatory atts are not right
+   * 			or actionClass not in classpath 
+   */  
+	protected void checkMyParms() throws Exception
+	{
+		String sAtt	= GpListener.obtainAtt(m_oParms
+				,GpListener.PARM_ACTION_CLASS,null);
+		m_oExecClass= GpListener.checkActionClass(sAtt);
+		  
+		sAtt		= GpListener.obtainAtt(m_oParms
+				  	,GpListener.PARM_MAX_THREADS,"1");
+		int iMax	= Integer.parseInt(sAtt);
+		m_iMaxThr	= Math.min(iMax,m_iUpperThreadLimit);
 
-    DomElement[] oaParms = m_oParms.getAllElemChildren();
+		// Third arg is null - Exception will br thrown if listenQueue is not found
+		String sQueue = GpListener.obtainAtt(m_oParms,LISTEN_QUEUE,null);
+		
+		// Third arg is null - Exception will br thrown if actionClass is not found
+		String sAction = GpListener.obtainAtt
+			(m_oParms,GpListener.PARM_ACTION_CLASS,null);
+		GpListener.checkActionClass(sAction);
 
-    boolean bFirst = true;
-    while (System.currentTimeMillis() <= m_lNextReload)
-    {
-      for (DomElement oCurr : oaParms)
-      {
-        oneScan(oCurr, bFirst);
-      }
-      if (waitForQuiesce(5000))
-      { m_bEndRequested = true;
-        return;
-      }
-      bFirst = false;
-    }
-  } //_________________________________________
+		// No problem if selector is null - everything in queue will be returned
+		m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
 
-  protected String formatLogMsg(String p_s)
-  {	return new StringBuilder("Processor '")
-    	.append(getClass().getSimpleName()).append("'  <")
-    	.append(p_s).append(">")
-    	.toString();
-  } //__________________________________
-  
-  private final void quiesceTopicSubscribe() throws JMSException, NamingException
-  {
-    try
-    {
-      m_oTopicConn	= null;
-      m_oSession	= null;
-      m_oTopic		= null;
-      m_oTopicSubs	= null;
-      
-      String sStopTopic	= m_oParms.getAttr(PARM_QUIESCE_TOPIC);
-      if (Util.isNullString(sStopTopic))
-    	  return;
-      String sFactClass	= m_oParms.getAttr(PARM_TOPIC_CONN_FACT);
-      if (Util.isNullString(sFactClass))
-    	  sFactClass = "ConnectionFactory";
+		m_oQconn		= null;
+		m_oQsess		= null;
+		m_oQueue		= null;
+	      
+		String sJndiType = GpListener.obtainAtt(m_oParms
+				,LISTEN_JNDI_TYPE,"jboss");
+		String sJndiURL	 = GpListener.obtainAtt(m_oParms
+				,LISTEN_JNDI_URL,"localhost");
+		Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
 
-      String sJndiType = SystemProperties.getJndiServerType();
-      String sJndiURL = SystemProperties.getJndiServerURL();
-      Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
-      Object tmp = oJndiCtx.lookup(sFactClass);
-      TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+		String sFactClass	= GpListener.obtainAtt(m_oParms
+				,LISTEN_QUEUE_CONN_FACT,"ConnectionFactory");
+		Object tmp = oJndiCtx.lookup(sFactClass);
+		QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
 
-      m_oTopicConn = tcf.createTopicConnection();
-      m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
-      m_oSession = m_oTopicConn.createTopicSession
-      		(false,TopicSession.AUTO_ACKNOWLEDGE);
-      m_oTopicConn.start();
-
-      String sSelector	= m_oParms.getAttr(PARM_QUIESCE_SELECTOR);
-      if (Util.isNullString(sSelector))
-    	  sSelector = "processor='"+getClass().getSimpleName()+"'";
-      m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true);
-    }
-    catch (Exception e)
-    { m_oLogger.error("Problems connecting to JMS. ",e);
-    }
-
-  } //_________________________________________
-
-  protected boolean waitForQuiesce(long p_lMillis) throws Exception
-  {
-    try
-    { boolean bSleep = (null== m_oTopicSubs);
-      Object oMsg = (bSleep) ? null : secureQuiesceReceive(p_lMillis);
-    
-      if (null!=oMsg)
-        { m_oLogger.info("Starting Quiesce of "
-        		+getClass().getSimpleName());
-          return true;
-        }
-      if (bSleep)
-    	  Thread.sleep(p_lMillis);
-      return false;
-
-    }
-    catch (Exception e)
-    { m_oLogger.error("Problems with waitForQuiesce. ",e);
-      Thread.sleep(p_lMillis);
-      return false;
-    }
-  } //_________________________________________
-
-  private Message secureQuiesceReceive(long p_lMillis) throws Exception
-  {
-    while (true)
-    try
-    { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); }
-    catch (JMSException e)
-    {
-      // put here your recovery code
-      return null;
-    }
-
-  } //_________________________________________
-
-
-  protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
-  {
-    String sPrcName = p_oP.getName();
-    if (!m_omChildPrc.containsKey(sPrcName))
-    {
-      ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
-      int iMax = m_oThrGrp.enumerate(oaCh);
-
-      ThreadGroup oThG = null;
-      for (int i1 = 0; null == oThG && i1 < iMax; i1++)
-      {	if (m_oThrGrp.getName().equals(sPrcName))
-          oThG = oaCh[i1];
-      }
-      if (null == oThG)
-        oThG = new ThreadGroup(sPrcName);
-      m_omChildPrc.put(sPrcName, newChildGroup(oThG));
-    }
-    GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName);
-
-    if (null == oChildGrp)				return;
-    if (p_bFirst)
-      oChildGrp.m_bError = false;
-
-    try
-    {
-      oChildGrp.execute(p_oP);
-    }
-    catch (Exception e)
-    {
-      oChildGrp.m_bError = true;
-      m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e);
-    }
-  } //_________________________________________
-
-  protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
-  	{ return new GroupOfChilds(pThG); }
-
-  protected class GroupOfChilds implements Observer
-  {
-	public static final String PARM_MAX_THREADS = "maxThreads";
-	
-    protected ThreadGroup	m_oThrGrp;
-    protected boolean 		m_bError = false;
-
-    protected Class 		m_oExecClass;
-    protected DomElement 	m_oChParms;
-
-    protected int 			m_iQthr = 0, m_iMaxThr;
-    protected StringBuilder m_sb;
-    protected int 			m_iSbIni;
-
-    protected QueueConnection m_oQconn;
-    protected QueueSession	m_oQsess;
-    protected Queue			m_oQueue;
-    protected String		m_sSelector;
-    
-    protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
-    {
-      m_oThrGrp = p_oThrGrp;
-      m_sb = new StringBuilder("GroupOfThreads ")
-          .append(m_oThrGrp.getName()).append(" : ");
-      m_iSbIni = m_sb.length();
-    } //________________________________
-
-    public void update(Observable p_oObs, Object p_oUsrObj)
-    {
-      if (p_oUsrObj instanceof Integer)
-      {
-        int iAdd = ((Integer) p_oUsrObj).intValue();
-        m_iQthr += iAdd;
-      }
-    } //________________________________
-
-    private void execute(DomElement p_oP) throws Exception
-    {
-      m_sb.setLength(m_iSbIni);
-      if (m_bError)
-      {
-        m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
-                       .toString());
-        return;
-      }
-      checkParms(p_oP);
-      doYourJob	(p_oP);
-    } //________________________________
-
-    protected void setMaxThreads(DomElement p_oP,int p_iMax)
-    {
-      String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
-      m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
-      m_iMaxThr = (m_iMaxThr < 1) ? 1 
-      			: (m_iMaxThr > p_iMax) ? p_iMax
-      			: m_iMaxThr;
-
-    } //________________________________
-
-    protected Class checkActionClass(String p_sName) throws Exception
-    {
-  	Class oCls;
-  	try	{ oCls = Class.forName(p_sName); }
-  	catch (ClassNotFoundException e)
-  	{ throw new Exception(formatLogMsg("Class "+p_sName
-  			+" not found in classpath"));
-  	}
-  	try	{ oCls.getConstructor(new Class[] {DomElement.class}); }
-  	catch (NoSuchMethodException eN)
-  	{ throw new Exception(formatLogMsg("No appropriate constructor "
-  			+p_sName+"(DomElement) found for class "));
-  	}
-  	return oCls;
-    } //_________________________________________
-
-    protected String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
-		throws Exception
-	{
-	  String sVal	= p_oP.getAttr(p_sAtt);
-	  if (null==sVal)
-		  sVal	= p_sDefault;
-	  if (Util.isNullString(sVal) && (null==p_sDefault))
-		  throw new Exception(formatLogMsg("Missing or invalid <"+p_sAtt+"> attribute"));
-	
-	  return sVal;
-	} //________________________________
-	
-	protected void checkParms(DomElement p_oP) throws Exception
-	{	
-    	m_sb.setLength(m_iSbIni);
-    	m_oChParms	= p_oP.cloneObj();
-    	m_oChParms.rmvChildsByName(AbstractProcessor.PARMS_THIS_INSTANCE);
-		setMaxThreads(p_oP,10);
+		m_oQconn = qcf.createQueueConnection();
+		m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+		m_oQsess = m_oQconn.createQueueSession
+			(false,TopicSession.AUTO_ACKNOWLEDGE);
+		m_oQconn.start();
+		m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
 		
-		obtainAtt(p_oP,LISTEN_QUEUE,null);
-		
-		m_sSelector = obtainAtt(p_oP,LISTEN_MSG_SELECTOR,null);
-		
-		String sClass = obtainAtt(p_oP,PARM_ACTION_CLASS,null);
-		if (Util.isNullString(sClass))
-			throw new Exception(formatLogMsg("Missing value for "+PARM_ACTION_CLASS));
-		m_oExecClass = Class.forName(sClass);
-		Constructor m_oConstructor = m_oExecClass.getConstructor	
-				(new Class[]  {DomElement.class});
-	   if (null==m_oConstructor)
-		   throw new Exception ("No constructor "+sClass+"(DomElement) found");
 	} //________________________________
 
-	  protected final void obtainQueue(DomElement p_oP) throws JMSException, NamingException
-	  {
-	    try
-	    {
-	      m_oQconn		= null;
-	      m_oQsess		= null;
-	      m_oQueue		= null;
-	      
-	      String sQueue	= p_oP.getAttr(LISTEN_QUEUE);
-	      String sFactClass	= p_oP.getAttr(LISTEN_QUEUE_CONN_FACT);
-	      if (Util.isNullString(sFactClass))
-	    	  sFactClass = "ConnectionFactory";
-
-	      String sJndiType = SystemProperties.getJndiServerType();
-	      String sJndiURL = SystemProperties.getJndiServerURL();
-	      Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
-	      Object tmp = oJndiCtx.lookup(sFactClass);
-	      QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
-
-	      m_oQconn = qcf.createQueueConnection();
-	      m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
-	      m_oQsess = m_oQconn.createQueueSession
-	      		(false,TopicSession.AUTO_ACKNOWLEDGE);
-	      m_oQconn.start();
-
-	    }
-	    catch (Exception e)
-	    { m_oLogger.error("Problems connecting to JMS. ",e);
-	    }
-
-	  } //_________________________________________
-
-	protected void doYourJob(DomElement p_oP) throws Exception
+    /**
+     * Implement run method for this Runnable
+     * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
+     * no more looping allowed for all child classes
+     * <p/> This condition will not prevent child processes to finish normally
+     */
+	public void run()
 	{
-		obtainQueue(p_oP);
-		while(System.currentTimeMillis() < m_lNextReload)
-        {
-		  if (m_iQthr >= m_iMaxThr)
-			  return;
+		while (m_oDad.continueLooping())
+		{
+			if (m_iQthr >= m_iMaxThr)
+			{	m_oLogger.info("Waiting for available threads...");
+				try { Thread.sleep(m_iSleepForThreads); }
+				catch (InterruptedException e) {return; }
+				break;
+			 }
+			 Message oM = null;
+			 try { oM = m_oRdr.receive(m_oDad.millisToWait()); }
+			 catch (JMSException oJ)
+			 {
+				 m_oLogger.error("JMS error on receive",oJ);
+				 for (int i1=0; i1<3; i1++)
+					 try {checkMyParms(); }  // try to reconnect to the queue
+					 catch (Exception e)
+					 {	m_oLogger.error("Reconnecting to Queue",e);
+					 	try { Thread.sleep(m_iSleepForThreads); }
+					 	catch (InterruptedException e1) 
+					 	{ //Just return
+					 		return;
+						}
+					 }
+			 }
+			 if (null==oM)
+				 continue;
 
-		  MsgChildProcess oNew = getMsgChildProcess(this,p_oP);
-	      new Thread(m_oThrGrp,oNew).start();
-	        // Wait a little bit, so thread count will be updated
-	        // at some point in the past, this sleep was indispensable
-	        // new thread control classes in Java 5 might have solved the problem
-	        Thread.sleep(500);
-        }
+			 AbstractAction oExec = null;
+			 try
+			 {	Constructor oConst = m_oExecClass
+				 	.getConstructor(GpListener.getActionClassArgs());
+			  	oExec = (AbstractAction)oConst.newInstance
+			  		(new Object[] {m_oParms,oM});
+			 }
+			 catch (Exception e)
+			 {	m_oLogger.error("Can't instantiate action class",e);
+			 	break;
+			 }
+			 // invoke the run method of the AbstractAction
+			 m_iQthr += 1;
+			 oExec.addObserver(this);
+			 new Thread(oExec).start();
+		 }
 		if (null!=m_oQsess)
 		      try { m_oQsess.close(); }    
-			catch (Exception e1) {}
+			catch (Exception e1) {/* Tried my best - Just continue */}
 		if (null!=m_oQconn)
 		      try { m_oQconn.close(); }    
-			catch (Exception e2) {}
-
-	} //________________________________
-	
-	protected MsgChildProcess getMsgChildProcess
-		(GroupOfChilds pDad, DomElement p_oP) throws Exception
-		{ return new MsgChildProcess (pDad,p_oP); }
-	
-  } //______________________________________________________
-  
-  protected class MsgChildProcess extends Observable implements Runnable
-  { 
-	protected GroupOfChilds m_oParent;  // you can always go there for common stuff
-	protected DomElement	m_oParms;
-	protected MsgProcessor m_oExec;
-
-	public MsgChildProcess(GroupOfChilds p_oGrp, DomElement p_oParms)
-  		throws Exception
-  	{
-	  m_oLogger.debug("Child "+p_oParms.getName());
-      m_oParent	= p_oGrp;
-      m_oParms	= p_oParms;
-      this.addObserver(m_oParent);
-      setChanged();
-      // add 1 to child thread count
-      notifyObservers(new Integer(1));
-  	} //__________________________________
-
-	 public void run()
-	 {
-	   Class[] oaArgs = {DomElement.class};
-	   MessageConsumer oReader = null;		 
-	   try
-	   {
-	     oReader = m_oParent.m_oQsess.createReceiver
-	   			(m_oParent.m_oQueue, m_oParent.m_sSelector);
-	   
-	   	 long lSlack;
-	   	 while ((lSlack=m_lNextReload-System.currentTimeMillis()) > 0)
-	   	 {
-		   Message oMsg = (null==oReader) ? null
-	    		  : oReader.receive(lSlack);
-		   if (null==oMsg)
-			   continue;
-
-		   try
-		   {	
-			   DomElement oParms = m_oParms.cloneObj();
-		   	   Constructor oCns = m_oParent.m_oExecClass.getConstructor(oaArgs);
-			   Object oInst = oCns.newInstance (new Object[] {oParms});
-		   	   m_oExec = (MsgProcessor)oInst;
-		   	   m_oExec.processMessage(oMsg);
-		   	   notifyOK();
-		   }
-		   catch (Exception e) 
-		   { e.printStackTrace();
-			   notifyError(e);
-	   	   }
-	     }
-	   }
-	   catch (JMSException oJ)
-	   {
-		   m_oLogger.error(oJ);
-	   }
-
-	   setChanged();
-	   // decrease child thread count in parent group
-	   notifyObservers(new Integer(-1));
+			catch (Exception e2) {/* Tried my best - Just continue */}
 	  } //______________________________
-	 
-	  public void notifyOK()
-	  { try
-	    { 
-		  Serializable oNotif = m_oExec.getOkNotification();
-		  for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
-	      { NotificationList oNL = new NotificationList(oCurr);
-	        if (! oNL.isOK())    continue;
-	        getNotifHandler().sendNotifications(oCurr,oNotif);
-	      }
-	    }
-	    catch (Exception e) {}
-	  } //__________________________________
-
-	  public void notifyError(Exception p_e)
-	  { 
-		Serializable oNotif = (null==m_oExec) 
-			? "No action class instantiated" 
-			: m_oExec.getErrorNotification();
-		ByteArrayOutputStream oBO = new ByteArrayOutputStream();
-	    PrintStream oPS = new PrintStream(oBO);
-	    try
-	    { oPS.println(oNotif.toString());
-	      if (null != p_e) p_e.printStackTrace(oPS);
-	      oPS.close();
-
-	      String sMsg = oBO.toString();
-	      for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
-	      { NotificationList oNL = new NotificationList(oCurr);
-	        if (! oNL.isErr())    continue;
-	        getNotifHandler().sendNotifications(oNL,sMsg);
-	      }
-	    }
-	    catch (Exception e) { }
-	  } //__________________________________
-
-	  protected InotificationHandler getNotifHandler()
-	  {
-		try {	return NotificationHandlerFactory.getNotifHandler
-		  			("remote"
-		  			,SystemProperties.getJndiServerType()
-		  			,SystemProperties.getJndiServerURL()
-		  			);
-			}
-		catch (Exception e)
-			{	m_oLogger.error(formatLogMsg("Notification FAILED"),e);
-				return null;
-			}
-	  } //______________________________
 	  
-  } //______________________________________________________
+/**
+ * Implementation of Observer interface
+ * <p/> Just count the number of active child threads
+ *  
+ */
+	public void update(Observable p_oObs, Object p_oUsrObj)
+	{
+		if (p_oUsrObj instanceof Integer)
+			m_iQthr += ((Integer) p_oUsrObj).intValue();
+	} //________________________________
+
   
 } //____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -24,54 +24,24 @@
 package org.jboss.soa.esb.listeners;
 
 import java.util.*;
-import java.io.Serializable;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.*;
 import java.sql.*;
 import javax.sql.*;
 
-import org.jboss.soa.esb.services.InotificationHandler;
-import org.jboss.soa.esb.services.NotificationHandlerFactory;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.SystemProperties;
 import org.jboss.soa.esb.helpers.*;
 import org.jboss.soa.esb.helpers.persist.*;
-import org.jboss.soa.esb.notification.NotificationList;
-import org.jboss.soa.esb.processors.*;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.util.*;
 /**
  * SqlTablePoller class
  * 
- * the  "main(args)" static method of this class will:
- *   1) load the parameters from the Name supplied in args[0] into a DomElement
- *      (See org.jboss.soa.esb.parameters package for options on parameter repositories)
- *   2) for each child element (1st level) of the DomElement, it will try to initiate
- *      a Thread group that can start a maximum of simultaneous child threads 
- *      "maxThreads" supplied in parameters - default=1
- *   3) each thread group will poll a SQL table with parameters defined in the corresponding
- *      DomElement (example below has only 1 child element, but you can have several)
- *   4) Execution will orderly finish when a message is received in the "quiesceTopic"
- *   	with an optional "quiesceSelector"  (please see AbstractPoller class constants) 
- * 
- *   The SQL table(s) that is (are) polled should have
+ *   The SQL table that is polled should have
  *   1) a unique key (see "keyFields" parameter) that will be used to update status
  *   2) a column to indicate the "processing status" of this trigger row (see ROW_STATE enum)
- *      this column will be updated by the SqlChildProcess.run() method (see internal
- *      protected class SqlChildProcess)
  *   
  *   Each retrieved row (see OPTIONAL_ATT.whereCondition) should be considered as a trigger
  *   that is intended to instantiate an object of "actionClass".  The new instance will 
- *   receive the full DomElement (level 1 for each child group), with an added child 
- *   DomElement (see EsbAbstractProcessor.PARMS_THIS_INSTANCE) containing attributes
- *   corresponding to the values of the "selectFields" columns in the row that triggered 
- *   the "actionClass"
+ *   receive the full DomElement (level 1 for each child group)
  *   
- *   The ZZDummyProcessor (in this package) is a trivial actionClass included with the
- *   sole purpose of illustrating what the actionClass will receive, and the entry points
- *   where users can insert their logic
- *   
- *   GOOD LUCK !!
- *
  *	@author Esteban Schifman
  */
 public class SqlTablePoller extends AbstractPoller
@@ -80,12 +50,9 @@
  * 
 <DocumentElementName>
    <ExampleListenChapter
-   		pollLatencySecs="20"
-   		parmsReloadSecs="300"
-
    		maxThreads="2"
-   		   		
-   		actionClass="org.jboss.soa.esb.listeners.ZZDummyProcessor"
+   		listenerClass="org.jboss.soa.esb.listeners.SqlTablePoller"
+   		actionClass="org.jboss.soa.esb.actions.DummySqlRowAction"  		   		
 
     	driver-class="org.postgresql.Driver"
     	connection-url="jdbc:postgresql://myhost:5432/myDB"
@@ -173,152 +140,131 @@
   };
   public static final String DEFAULT_STATES = "PWED";
   
-  protected Map<String,String> m_oVals = new HashMap<String,String>();
+  protected Map<String,String>	m_oVals = new HashMap<String,String>();
+  protected String[]			m_saCols	,m_saKeys;	
+  protected String				m_sUpdStates;
 
-  /**
-   * Main program for SqlTablePoller 
-   * @param args - String[]  First parameter must be path to configuration XML file
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception
+ /**
+  * In this constructor you can override default values for the following protected base class values:
+  * <br/>
+  * <p/>m_iMinPollMillis	: minimum polling interval (default 3000)
+  * <br/>m_iDfltPollMillis : default polling interval (default 20000)
+  * <br/>m_iSleepForThreads	: how long to sleep if all configured threads are in use (default 3000)
+  * <br/>m_iUpperThreadLimit : max number of threads allowed (default 10) 
+  * @param p_oDad GpListener - The controlling process
+  * @param p_oParms DomElement - Sub tree that corresponds to this instance
+  * @throws Exception
+  */
+  public SqlTablePoller(GpListener p_oDad, DomElement p_oParms) throws Exception
   {
-    new SqlTablePoller(args[0]);
-  } //________________________________
-
-
-  public SqlTablePoller(String p_sParamsUid) throws Exception
-  {
-	super(p_sParamsUid);
-	m_iDfltReloadMillis	= 180000;
-	m_iDfltPollMillis	= 20000;
-	m_iMinPollMillis	= 5000;
-//	 See superclass - It provides ability to request end by subscribing to a Topic
-	runUntilEndRequested();
+	super(p_oDad,p_oParms);
+	try { checkMyParms(); }
+	catch (Exception e)
+	{
+		m_oLogger.error("checkMyParms() FAILED",e);
+		throw e;
+	}
   } //__________________________________
-
-  /**
-   * Override this method if you wish to extend the SqlChildProcess class
-   * (for example with ad-hoc notifications for OK list or Error list)
-   * @param p_oParms = DomElement containing attributes of trigger
-   * @return  the SqlChildProcess to be started
-   * @throws Exception
-   */
-  	protected SqlChildProcess getSqlChildProcess
-  		(SqlPollerChildGroup pDad, DomElement p_oParms) throws Exception
-  	{
-  		return new SqlChildProcess(pDad, p_oParms);
-  	} //________________________________
-
-  @Override
-  protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
-  {
-  	return new SqlPollerChildGroup(pThG);
-  } //__________________________________
   
-  protected class SqlPollerChildGroup extends AbstractPoller.GroupOfChilds
-  {
-	JdbcCleanConn		m_oConn;
-	String[]			m_saKeys;
-	  
-    protected SqlPollerChildGroup(ThreadGroup p_oThrGrp) throws Exception
-    {	super(p_oThrGrp);    	
+    private void checkAndStoreAtt(DomElement p_oP, String p_sName, String p_sDflt)
+    	throws Exception
+    {
+    	m_oVals.put(p_sName,GpListener.obtainAtt(p_oP,p_sName,p_sDflt));
     } //________________________________
 
-	protected void checkAtt(DomElement p_oP,String p_sAtt, String p_sDefault)
-		throws Exception
-    {	m_oVals.put(p_sAtt,obtainAtt(p_oP,p_sAtt,p_sDefault));		
-	} //________________________________
-
-	protected void checkParms(DomElement p_oP) throws Exception
+	protected void checkMyParms() throws Exception
     { 
-	  super.checkParms(p_oP);
-
-	  checkAtt(p_oP,SimpleDataSource.DRIVER		,null);
-	  checkAtt(p_oP,SimpleDataSource.URL		,null);
-	  checkAtt(p_oP,SimpleDataSource.USER		,"");
-	  checkAtt(p_oP,SimpleDataSource.PASSWORD	,"");
+	  checkAndStoreAtt(m_oParms,SimpleDataSource.DRIVER		,null);
+	  checkAndStoreAtt(m_oParms,SimpleDataSource.URL		,null);
+	  checkAndStoreAtt(m_oParms,SimpleDataSource.USER		,"");
+	  checkAndStoreAtt(m_oParms,SimpleDataSource.PASSWORD	,"");
 	  
 	  for (TABLE_ATT oCurr : TABLE_ATT.values())
-		  checkAtt(p_oP,oCurr.toString(),null);
+		  checkAndStoreAtt(m_oParms,oCurr.toString(),null);
 	  
-	  checkAtt(p_oP,OPTIONAL_ATT.whereCondition.toString(),"");
-	  checkAtt(p_oP,OPTIONAL_ATT.orderBy.toString(),"");
+	  checkAndStoreAtt(m_oParms,OPTIONAL_ATT.whereCondition.toString(),"");
+	  checkAndStoreAtt(m_oParms,OPTIONAL_ATT.orderBy.toString(),"");
 
 	  String sAtt = OPTIONAL_ATT.inProcessVals.toString();
-	  checkAtt(p_oP,sAtt,DEFAULT_STATES);
-	  String sStates = m_oVals.get(sAtt);
-	  if (sStates.length()<4)
-		  throw new Exception(formatLogMsg
-				  ("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)"
-				  ));
+	  checkAndStoreAtt(m_oParms,sAtt,DEFAULT_STATES);
+	  m_sUpdStates = m_oVals.get(sAtt);
+	  if (m_sUpdStates.length()<4)
+		  throw new Exception("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)");
 
-	  checkAtt(p_oP,PARM_ACTION_CLASS,null);
-	  m_oExecClass = super.checkActionClass(m_oVals.get(PARM_ACTION_CLASS));
-	  
 	  StringTokenizer ST = new StringTokenizer
 	  	(m_oVals.get(TABLE_ATT.selectFields.toString()),",");
+	  m_saCols = new String[ST.countTokens()];
 	  Set<String> oSelFlds = new HashSet<String>();
+	  int iCurr = 0;
 	  while (ST.hasMoreElements())
-		  oSelFlds.add(ST.nextToken().trim());
+	  {
+		  String sColName = ST.nextToken().trim(); 
+		  m_saCols[iCurr++]	= sColName;
+		  oSelFlds.add(sColName);
+	  }
 
 	  ST = new StringTokenizer
 	  	(m_oVals.get(TABLE_ATT.keyFields.toString()),",");
 	  m_saKeys = new String[ST.countTokens()];
 	  if (m_saKeys.length < 1)
-			throw new Exception(formatLogMsg("Empty list of keyFields"));
+			throw new Exception("Empty list of keyFields");
 
-	  for (int iCurr = 0; ST.hasMoreTokens(); iCurr++)
-	  {	String sKey = ST.nextToken().trim();
-		if (! oSelFlds.contains(sKey))
-			throw new Exception(formatLogMsg("Key field <"+ sKey + "> must also be in select list"));
-		m_saKeys[iCurr]	= sKey;
+	  for (iCurr = 0; ST.hasMoreTokens(); iCurr++)
+	  {	String sKeyCol = ST.nextToken().trim();
+		if (! oSelFlds.contains(sKeyCol))
+			throw new Exception("Key field <"+ sKeyCol + "> must also be in select list");
+		m_saKeys[iCurr]	= sKeyCol;
 	  }
 
     } //________________________________
-	
+
 	@Override
-	protected void doYourJob(DomElement p_oP) throws Exception
+	protected Object preProcess(Object p_o) throws Exception 
 	{
-		for (DomElement oCurr : getTriggers())
-		{
-			if (m_iQthr >= m_iMaxThr)
-	        {
-	          m_oLogger.info(m_sb.append("Waiting for available threads").toString());
-	          Thread.sleep(5000);
-	          break;
-	        }
-
-			SqlChildProcess oNew = getSqlChildProcess(this,oCurr);
-	        new Thread(m_oThrGrp,oNew).start();
-	        // Wait a little bit, so thread count will be updated
-	        // at some point in the past, this sleep was indispensable
-	        // new thread control classes in Java 5 might have solved the problem
-	        Thread.sleep(500);
-		}
-		
+		return p_o;
 	} //________________________________
-	
-	protected List<DomElement> getTriggers() throws Exception
-	{	
-		JdbcCleanConn oConn = newDbConn();
-		String sScan = scanStatement();
 
-		PreparedStatement PS = oConn.prepareStatement(sScan);
-		ResultSet RS = oConn.execQueryWait(PS,1);
+	@Override
+	protected List<Object> pollForCandidates() 
+	{
+		String sSel4U	= selectForUpdStatement();
+		String sUpdStmt	= updateStatement();
+		JdbcCleanConn	oConn	 = null;
+		List<Object> 	oResults = new ArrayList<Object>();
+		try
+		{
+			oConn = newDbConn();
+			String sScan = scanStatement();
 
-		ResultSetMetaData oMeta =  RS.getMetaData();
-		String[] saColName = new String[oMeta.getColumnCount()];
-		for (int i1=0; i1<saColName.length; i1++)
-			saColName[i1] = oMeta.getColumnName(1+i1);
-		
-		List<DomElement> oResults = new ArrayList<DomElement>();
-		while (RS.next())
-		{	DomElement oNew = new DomElement(AbstractProcessor.PARMS_THIS_INSTANCE);
-			for (String sCurrCol : saColName)
-				oNew.setAttr(sCurrCol,RS.getString(sCurrCol));
-			oResults.add(oNew);
+			PreparedStatement PS = oConn.prepareStatement(sScan);
+			ResultSet RS = oConn.execQueryWait(PS,1);
+			while (RS.next())
+			{	Map<String,Object> oColVals = new HashMap<String,Object>();
+				int iCurr = 0;
+				for (String sColName : m_saCols)
+					oColVals.put(sColName,RS.getObject(++iCurr));
+
+				// Set up the parameter object for the SqlRowAction
+				AbstractSqlRowAction.Params oActionP = new AbstractSqlRowAction.Params();
+				oActionP.omVals		= oColVals;
+				oActionP.sUpdStates	= m_sUpdStates;
+				oActionP.saCols		= m_saCols;
+				oActionP.saKeys		= m_saKeys;
+				oActionP.sSel4Upd	= sSel4U;
+				oActionP.sUpdate	= sUpdStmt;
+				
+				oResults.add(oActionP);
+			}
 		}
-		oConn.release();
+		catch (Exception e)
+		{
+			m_oLogger.warn("Some triggers might not have been returned",e);
+		}
+		finally
+		{
+			if (null!=oConn)
+				oConn.release();
+		}
 		
 		return oResults;
 	} //________________________________
@@ -416,187 +362,4 @@
 		return sb.append(" for update").toString();
 	} //________________________________
 
-  } //______________________________________________________
-
-  /**
-   * The child process group will try to start a new thread for each
-   * element in the ResultSet returned by the scanStatement()
-   * The controlling SqlPollerChildGroup will not start a thread if
-   * "maxThreads" is exceeded
-   * The idea is to obtain a new instance of the "actionClass" for each
-   * row of the resultSet
-   * Contents of the table field indicated by the "inProcessField" parameter are used
-   * to control concurrency  (see enum ROW_STATE for details)
-   * 
-   * @author Esteban
-   *
-   */
-  protected class SqlChildProcess extends Observable implements Runnable
-  { 
-	protected SqlPollerChildGroup m_oParent;  // you can always go there for common stuff
-	protected DomElement	m_oInstP;	// values for this instance
-	protected JdbcCleanConn m_oConn;	// each child has it's own DB connection
-	protected PreparedStatement m_PSsel4U	,m_PSupd;
-	protected String		m_sUpdStates;
-	protected String getStatus(ROW_STATE p_oState)
-	{	int iPos = p_oState.ordinal();
-		return m_sUpdStates.substring(iPos,++iPos);
-	} //________________________________
-
-  	public SqlChildProcess(SqlPollerChildGroup p_oGrp, DomElement p_oP)
-  		throws Exception
-  	{
-      m_oParent	= p_oGrp;
-      this.addObserver(m_oParent);
-      setChanged();
-      // add 1 to child thread count
-      notifyObservers(new Integer(1));
-
-      m_oInstP	= p_oP;
-  	  m_oConn	= p_oGrp.newDbConn();
-  	  m_sUpdStates	= m_oVals.get(OPTIONAL_ATT.inProcessVals.toString());
-  	} //__________________________________
-
-	 public void run()
-	 {
-       Exception oAbend = null;
-	   try
-	   {	m_PSsel4U	= m_oConn.prepareStatement(m_oParent.selectForUpdStatement());
- 			m_PSupd		= m_oConn.prepareStatement(m_oParent.updateStatement());
-
- 			int iParm=1;
-	  		for (String sCurr : m_oParent.m_saKeys)
-	  		{	String sVal = m_oInstP.getAttr(sCurr);
-	  			m_PSsel4U.setString	(iParm	,sVal);
-	  			// parameters are +1 in update statement
-	  			// autoincrement leaves things ready for next SQL parameter
-	  			m_PSupd.setString	(++iParm,sVal);
-	  		}
-	  		// will only continue if it can change status to "Working"
-	  		if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working))
-	  			oAbend = new Exception("Unable to change status to Working");
-	   }
-	   catch (Exception e)
-	   {	m_oLogger.error("Problems with update statements",e);
-	   		if (null!=m_oConn)
-	   		{	try {  m_oConn.rollback(); }
-	   			catch (Exception eR) {  /* OK  do nothing  */}
-	   			m_oConn.release();
-	   		}
-	   		oAbend = e;
-	   }
-
-	   if (null==oAbend)
-	   try
-	   {
-		   Constructor oCnst = m_oParent.m_oExecClass
-		   		.getConstructor	(new Class[]  {DomElement.class});
-		   DomElement oParms = m_oParent.m_oChParms.cloneObj();
-		   oParms.addElemChild(m_oInstP);
-		   Object oInst = oCnst.newInstance (new Object[] {oParms});
-	       ((AbstractProcessor)oInst).execute();
-	       changeStatus(ROW_STATE.Working,ROW_STATE.Done);
-	   }
-	   catch (Exception e) 
-	   { m_oLogger.error("run() FAILED",e);
-       	 try
-       	 { 	m_oConn.rollback();
-       		changeStatus(null,ROW_STATE.Error); 
-       	 }
-       	 catch (Exception CS) { /*  What could we do here ?  */}
-       	 oAbend = e;
-	   }
-	   
-	   finally
-	   {  if (null!=m_oConn)
-		   	m_oConn.release();
-	   }
-
-	   if (null==oAbend)
-		   notifyOK();
-	   else
-		   notifyError(oAbend);
-
-	   setChanged();
-	   // decrease child thread count in parent group
-	   notifyObservers(new Integer(-1));
-	  } //______________________________
-	 
-	  private boolean changeStatus (ROW_STATE pFrom, ROW_STATE pTo) throws Exception
-	  {	ResultSet RS = m_oConn.execQueryWait(m_PSsel4U,5);
-	  	if (! RS.next())
-	  		return false;
-	  	if (null!=pFrom)
-	  	{	String sOldStatus = RS.getString(1).substring(0,1);
-	  		if (!sOldStatus.equalsIgnoreCase(getStatus(pFrom)))
-	  		{	m_oConn.rollback();
-	  			return false;
-	  		}
-	  	}
-	  	m_PSupd.setString(1,getStatus(pTo));
-	  	m_oConn.execUpdWait(m_PSupd,5);
-	  	m_oConn.commit();
-	  	
-		return true;
-	  } //______________________________
-	  
-	  public void notifyOK()
-	  { try
-	    { 
-		  java.io.Serializable oNotif = getOkNotifContent();
-		  for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
-	      { NotificationList oNL = new NotificationList(oCurr);
-	        if (! oNL.isOK())    continue;
-	        getNotifHandler().sendNotifications(oCurr,oNotif);
-	      }
-	    }
-	    catch (Exception e) {}
-	  } //__________________________________
-
-	  public void notifyError(Exception p_e)
-	  { 
-		Serializable oNotif = getErrorNotifContent();
-		ByteArrayOutputStream oBO = new ByteArrayOutputStream();
-	    PrintStream oPS = new PrintStream(oBO);
-	    try
-	    { oPS.println(oNotif);
-	      if (null != p_e) p_e.printStackTrace(oPS);
-	      oPS.close();
-
-	      String sMsg = oBO.toString();
-	      for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
-	      { NotificationList oNL = new NotificationList(oCurr);
-	        if (! oNL.isErr())    continue;
-	        getNotifHandler().sendNotifications(oNL,sMsg);
-	      }
-	    }
-	    catch (Exception e) { }
-	  } //__________________________________
-
-	  protected InotificationHandler getNotifHandler()
-	  {
-		try {	return NotificationHandlerFactory.getNotifHandler
-		  			("remote"
-		  			,SystemProperties.getJndiServerType()
-		  			,SystemProperties.getJndiServerURL()
-		  			);
-			}
-		catch (Exception e)
-			{	m_oLogger.error(formatLogMsg("Notification FAILED"),e);
-				return null;
-			}
-	  } //______________________________
-	  
-	  // These methods to be overriden by you own derived class
-	  protected Serializable getOkNotifContent()
-	  {
-		  return "Success";
-	  }
-	  protected Serializable getErrorNotifContent()
-	  {
-		  return "FAILURE";
-	  }
-
-  } //______________________________________________________
-
 } //____________________________________________________________________________

Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,25 +0,0 @@
-package org.jboss.soa.esb.listeners;
-
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.processors.*;
-
-public class ZZDummyProcessor extends AbstractProcessor
-{
-	public ZZDummyProcessor(DomElement p_oParms) throws Exception
-	{	super(p_oParms);
-	}
-
-	@Override
-	public void execute() throws Exception 
-	{
-		System.out.println(getClass().getName()+"  "
-		+ m_oParms.toString()
-		);
-	}
-
-	@Override
-	protected void checkParms() throws Exception 
-	{
-	}
-
-} //____________________________________________________________________________

Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java	2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java	2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,53 +0,0 @@
-package org.jboss.soa.esb.listeners;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.jboss.soa.esb.helpers.*;
-
-public class ZZderivedSqlTablePoller extends SqlTablePoller
-{
-	  public static void main(String[] args) throws Exception
-	  	{	new ZZderivedSqlTablePoller(args[0]); }
-
-	  public ZZderivedSqlTablePoller(String pParamsUid) throws Exception
-	  {	super(pParamsUid);
-	  } //________________________________
-	  
-	  protected SqlChildProcess getSqlChildProcess
-  			(SqlPollerChildGroup pDad, DomElement p_oParms) throws Exception
-  	  {
-  		return new ZZChildProcess(pDad, p_oParms);
-  	  } //________________________________
-	  
-	  private static final SimpleDateFormat s_oFmt 
-	  		= new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.mmm");
-
-	  protected class ZZChildProcess extends SqlChildProcess
-	  { 
-		  public ZZChildProcess(SqlPollerChildGroup p_oGrp, DomElement p_oP)
-	  		throws Exception
-	  		{	super(p_oGrp,p_oP); }
-		  
-		  protected String getMsgPfx()
-		  {	return ZZderivedSqlTablePoller.class.getSimpleName()
-			  +" "+s_oFmt.format(new Date(System.currentTimeMillis()))+" ";
-		  }
-
-		  // object m_oInstP contains parameters unique to this instance
-		  // object m_oParent.m_oChParms contains parameters common to all child threads
-		  // of the parent child group
-		  protected String getOkNotifContent()
-		  {
-			  return getMsgPfx()+" OK "+m_oInstP.toString();
-		  }
-		  
-		  protected String getErrorNotifContent()
-		  {		
-			  return getMsgPfx()+" eeeeerrrrrrrrr "+m_oInstP.toString();  
-		  }
-
-	  } //__________________________________________________
-
-	  
-} //____________________________________________________________________________




More information about the jboss-svn-commits mailing list