[jboss-svn-commits] JBL Code SVN: r5590 - in labs/jbossesb/trunk/product/core/listeners/src: . org/jboss/soa/esb org/jboss/soa/esb/actions org/jboss/soa/esb/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Aug 7 22:20:28 EDT 2006
Author: estebanschifman
Date: 2006-08-07 22:20:20 -0400 (Mon, 07 Aug 2006)
New Revision: 5590
Added:
labs/jbossesb/trunk/product/core/listeners/src/log4j.properties
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
Removed:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
Heavy refactoring of the listeners package.
New package prg.jboss.soa.esb.actions added
Added: labs/jbossesb/trunk/product/core/listeners/src/log4j.properties
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/log4j.properties 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/log4j.properties 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,7 @@
+### direct log messages to stdout ###
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
+### set log levels - for more verbose logging change 'info' to 'debug' ###
+log4j.rootLogger=info, stdout
\ No newline at end of file
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractAction.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,25 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.util.Observable;
+import org.apache.log4j.Logger;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+public abstract class AbstractAction extends Observable
+ implements Runnable
+{
+ public abstract void processCurrentObject() throws Exception;
+ public abstract Serializable getOkNotification();
+ public abstract Serializable getErrorNotification();
+
+ DomElement m_oParms;
+ Object m_oCurr;
+ Logger m_oLogger = Logger.getLogger(this.getClass());
+
+ protected AbstractAction(DomElement p_oP, Object p_oCurr)
+ { m_oParms = p_oP;
+ m_oCurr = p_oCurr;
+ } //________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractFileAction.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,74 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.File;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+
+public abstract class AbstractFileAction extends AbstractAction
+{
+/**
+ * Extend this class if you need to implement an action class intended to
+ * process one file at a time
+ * <br/>See FileCopier as an example
+ * @param p_oP DomElement - Parameter tree passed by controlling listener
+ * @param p_oCurr Object - This is the object that's going to get processed
+ * <br/>All classes that extend AbstractFileAction receive an instance of the internal
+ * <br/>class Params containing info needed to rename the file in different stages
+ * <br/>of processing
+ */
+ protected AbstractFileAction(DomElement p_oP, Object p_oCurr)
+ { super(p_oP,p_oCurr);
+ } //________________________________
+
+ public static class Params
+ {
+ public boolean bPostDelete;
+ public File oInpF ,oWrkF ,oErrF ,oDoneF;
+ public String toString() { return oInpF.toString(); }
+ } //________________________________
+
+ public boolean isPostDelete() { return ((Params)m_oCurr).bPostDelete; }
+ public File getInputFile() { return ((Params)m_oCurr).oInpF; }
+ public File getWorkFile() { return ((Params)m_oCurr).oWrkF; }
+ public File getErrorFile() { return ((Params)m_oCurr).oErrF; }
+ public File getDoneOkFile(){ return ((Params)m_oCurr).oDoneF; }
+
+ public boolean renameToError() { return getWorkFile().renameTo(getErrorFile()); }
+ public boolean renameToDone () { return getWorkFile().renameTo(getDoneOkFile()); }
+
+/**
+ * Overrides run() in AbstractAction
+ * <p/>Files processed by action classes need to be renamed during processing to
+ * disable other listeners (or other threads launched by the same listener that
+ * started this thread) to pick up the same file
+ * <br/>Once processing ends a suffix will be added to the name of the original
+ * file that has been processed. The suffix will be different according to the
+ * result of processing (OK or Exception). Files could be moved to different
+ * directories as well
+ * <br/> Parameters for these options can be provided at run time in the
+ * DomElement (arg 0 in constructor)
+ */
+ public void run()
+ {
+ try
+ {
+ processCurrentObject();
+ if (isPostDelete())
+ getWorkFile().delete();
+ else
+ renameToDone();
+ GpListener.notifyOK(m_oParms,getOkNotification());
+ }
+ catch (Exception e)
+ {
+ renameToError();
+ GpListener.notifyError(m_oParms,e,getErrorNotification());
+ }
+ finally
+ { setChanged();
+ notifyObservers(new Integer(-1));
+ }
+ } //________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/AbstractSqlRowAction.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,134 @@
+package org.jboss.soa.esb.actions;
+
+import java.util.*;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import javax.sql.DataSource;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.listeners.SqlTablePoller.ROW_STATE;
+
+public abstract class AbstractSqlRowAction extends AbstractAction
+{
+ protected JdbcCleanConn m_oConn;
+ protected PreparedStatement m_PSsel4U ,m_PSupd;
+
+ protected AbstractSqlRowAction(DomElement p_oP, Object p_oCurr) throws Exception
+ {
+ super(p_oP,p_oCurr);
+ DataSource oDS = new SimpleDataSource(m_oParms);
+ m_oConn = new JdbcCleanConn(oDS);
+ m_PSsel4U = m_oConn.prepareStatement(getSel4Upd());
+ m_PSupd = m_oConn.prepareStatement(getUpdStmt());
+
+ int iParm=1;
+ for (String sColName : getKeys())
+ {
+ Object oVal = getColumnValue(sColName);
+ m_PSsel4U.setObject (iParm ,oVal);
+ // parameters are +1 in update statement
+ // autoincrement leaves things ready for next SQL parameter
+ m_PSupd.setObject (++iParm,oVal);
+ }
+ } //________________________________
+
+ public static class Params
+ {
+ public String sUpdStates;
+ public String[] saCols ,saKeys;
+ public String sSel4Upd,sUpdate;
+ public Map<String,Object> omVals;
+ public String toString() { return omVals.toString(); }
+ } //________________________________
+
+ protected String getSel4Upd()
+ { return ((Params)m_oCurr).sSel4Upd;
+ } //________________________________
+
+ protected String getUpdStmt()
+ { return ((Params)m_oCurr).sUpdate;
+ } //________________________________
+
+ protected String getStatus(ROW_STATE p_oState)
+ { int iPos = p_oState.ordinal();
+ return ((Params)m_oCurr).sUpdStates.substring(iPos,++iPos);
+ } //________________________________
+
+ protected Object getColumnValue(String p_sKey)
+ { return ((Params)m_oCurr).omVals.get(p_sKey);
+ } //________________________________
+
+ protected String[] getKeys()
+ { return ((Params)m_oCurr).saKeys;
+ } //________________________________
+
+ private boolean changeStatus (ROW_STATE pFrom, ROW_STATE pTo) throws Exception
+ { ResultSet RS = m_oConn.execQueryWait(m_PSsel4U,5);
+ if (! RS.next())
+ return false;
+ if (null!=pFrom)
+ { String sOldStatus = RS.getString(1).substring(0,1);
+ if (!sOldStatus.equalsIgnoreCase(getStatus(pFrom)))
+ { m_oConn.rollback();
+ return false;
+ }
+ }
+ m_PSupd.setString(1,getStatus(pTo));
+ m_oConn.execUpdWait(m_PSupd,5);
+ m_oConn.commit();
+
+ return true;
+ } //______________________________
+
+ public void run()
+ {
+ // will only continue if it can change status to "Working"
+ try
+ {
+ if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working))
+ { m_oLogger.warn("Unable to change status to Working");
+ m_oConn.rollback();
+ }
+ m_oConn.commit();
+ }
+ catch(Exception e)
+ { m_oLogger.error("Unable to change status to Working",e);
+ return;
+ }
+
+ try
+ {
+ processCurrentObject();
+ changeStatus(ROW_STATE.Working,ROW_STATE.Done);
+ m_oConn.commit();
+ GpListener.notifyOK(m_oParms,getOkNotification());
+ }
+ catch (Exception e)
+ {
+ try
+ { changeStatus(ROW_STATE.Working,ROW_STATE.Error);
+ m_oConn.commit();
+ }
+ catch (Exception eErr)
+ {
+ m_oLogger.error("Unable to change status to ERROR - Really weird - shouldn't happen");
+ }
+ GpListener.notifyError(m_oParms,e,getErrorNotification());
+ }
+ finally
+ { setChanged();
+ notifyObservers(new Integer(-1));
+ if (null!=m_oConn)
+ { try { m_oConn.rollback(); }
+ catch(Exception e) { /* OK just continue */ }
+ m_oConn.release();
+ }
+ }
+ } //________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyAction.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,75 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.text.*;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+/**
+ * Use this class to tune your XML configurations
+ * <p/>Once your config works with this dummy class, you can
+ * switch to your own action class
+ * <p/>You will have to implement these three methods
+ * in your own action class
+ *
+ * @author Esteban
+ *
+ */
+public class DummyAction extends AbstractAction
+{
+/**
+ * Constructor must always have the configuration tree and the object
+ * that the run() method will process
+ * @param p_oP
+ * @param p_oCurr
+ */
+ public DummyAction(DomElement p_oP, Object p_oCurr) throws Exception
+ {
+ super(p_oP,p_oCurr);
+ }
+
+ public void run()
+ {
+ try
+ {
+ processCurrentObject();
+ GpListener.notifyOK(m_oParms,getOkNotification());
+ }
+ catch (Exception e)
+ {
+ GpListener.notifyError(m_oParms,e,getErrorNotification());
+ }
+ finally
+ { setChanged();
+ notifyObservers(new Integer(-1));
+ }
+ } //________________________________
+
+ @Override
+ public void processCurrentObject() throws Exception
+ {
+ m_oLogger.info("processObject was called with <<"
+ +m_oCurr.toString()+">>");
+ } //________________________________
+
+ private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+ private String getStamp()
+ { return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+ @Override
+ public Serializable getOkNotification()
+ {
+ return getStamp()+" Notif OK - <"
+ +((null==m_oCurr)?"null":m_oCurr.toString())
+ +">";
+ } //________________________________
+
+ @Override
+ public Serializable getErrorNotification()
+ {
+ return getStamp()+" Notif ERROR - <"
+ +((null==m_oCurr)?"null":m_oCurr.toString())
+ +">";
+ } //________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummyFileAction.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,57 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.text.*;
+
+import org.jboss.soa.esb.helpers.DomElement;
+/**
+ * Use this class to tune your XML configurations
+ * <p/>Once your config works with this dummy class, you can
+ * switch to your own action class
+ * <p/>You will have to implement these three methods
+ * in your own action class
+ *
+ * @author Esteban
+ *
+ */
+public class DummyFileAction extends AbstractFileAction
+{
+/**
+ * Constructor must always have the configuration tree and the object
+ * that the run() method will process
+ * @param p_oP
+ * @param p_oCurr
+ */
+ public DummyFileAction(DomElement p_oP, Object p_oCurr) throws Exception
+ {
+ super(p_oP,p_oCurr);
+ }
+
+ @Override
+ public void processCurrentObject() throws Exception
+ {
+ m_oLogger.info("processObject was called with <<"
+ +m_oCurr.toString()+">>");
+ } //________________________________
+
+ private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+ private String getStamp()
+ { return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+ @Override
+ public Serializable getOkNotification()
+ {
+ return getStamp()+" Notif OK - <"
+ +((null==m_oCurr)?"null":m_oCurr.toString())
+ +">";
+ } //________________________________
+
+ @Override
+ public Serializable getErrorNotification()
+ {
+ return getStamp()+" Notif ERROR - <"
+ +((null==m_oCurr)?"null":m_oCurr.toString())
+ +">";
+ } //________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/DummySqlRowAction.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,57 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+import java.text.*;
+
+import org.jboss.soa.esb.helpers.DomElement;
+/**
+ * Use this class to tune your XML configurations
+ * <p/>Once your config works with this dummy class, you can
+ * switch to your own action class
+ * <p/>You will have to implement these three methods
+ * in your own action class
+ *
+ * @author Esteban
+ *
+ */
+public class DummySqlRowAction extends AbstractSqlRowAction
+{
+/**
+ * Constructor must always have the configuration tree and the object
+ * that the run() method will process
+ * @param p_oP
+ * @param p_oCurr
+ */
+ public DummySqlRowAction(DomElement p_oP, Object p_oCurr) throws Exception
+ {
+ super(p_oP,p_oCurr);
+ }
+
+ @Override
+ public void processCurrentObject() throws Exception
+ {
+ m_oLogger.info("processObject was called with <<"
+ +m_oCurr.toString()+">>");
+ } //________________________________
+
+ private SimpleDateFormat s_oTS = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+ private String getStamp()
+ { return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+ @Override
+ public Serializable getOkNotification()
+ {
+ return getStamp()+" Notif OK - <"
+ +((null==m_oCurr)?"null":m_oCurr.toString())
+ +">";
+ } //________________________________
+
+ @Override
+ public Serializable getErrorNotification()
+ {
+ return getStamp()+" Notif ERROR - <"
+ +((null==m_oCurr)?"null":m_oCurr.toString())
+ +">";
+ } //________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/FileCopier.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,107 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.soa.esb.actions;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.net.URI;
+
+import org.jboss.soa.esb.helpers.*;
+
+public class FileCopier extends AbstractFileAction
+{
+ private static final String TMP_SUFFIX = ".notReady";
+ private static final String PARMS_COPY_TO = "CopyTo";
+ private static final String PARMS_OUTDIR = "copyToDirURI";
+ private static final String PARMS_OUTSFX = "copyToSuffix";
+
+ private File [] m_oaTmpFile;
+ private File [] m_oaOutFile;
+
+ public FileCopier(DomElement p_oP, Object p_oCurr) throws Exception
+ { super(p_oP,p_oCurr);
+ checkMyParms();
+ } //__________________________________
+
+ public void processCurrentObject() throws Exception
+ {
+ // Open & create temp files
+ FileInputStream oFI = new FileInputStream(getInputFile());
+ FileOutputStream []oaFO = new FileOutputStream[m_oaTmpFile.length];
+ for (int i1=0; i1<oaFO.length;i1++)
+ oaFO[i1] = new FileOutputStream(m_oaTmpFile[i1]);
+
+ // Perform Copy
+ byte[] ba = new byte[50000];
+ while (true)
+ { int iQ = oFI.read(ba);
+ if (iQ < 0) break;
+ for (int i1=0; i1<oaFO.length; i1++)
+ oaFO[i1].write(ba,0,iQ);
+ }
+ // Close
+ oFI.close();
+ for (int i1=0; i1<oaFO.length; i1++) oaFO[i1].close();
+
+ // Rename
+ for (int i1=0; i1<m_oaTmpFile.length;i1++)
+ { m_oaOutFile[i1].delete();
+ m_oaTmpFile[i1].renameTo(m_oaOutFile[i1]);
+ }
+ } //__________________________________
+
+ protected void checkMyParms() throws Exception
+ { DomElement[] oaPout = m_oParms.getElementChildren(PARMS_COPY_TO);
+ m_oaTmpFile = new File[oaPout.length];
+ m_oaOutFile = new File[oaPout.length];
+
+ if (oaPout.length<1)
+ { m_oLogger.warn("No output files specified for FileCopier");
+ return;
+ }
+ String sFile = getInputFile().getName();
+
+ for (int i1=0; i1<m_oaTmpFile.length;i1++)
+ { File oDir = new File(new URI(oaPout[i1].getAttr(PARMS_OUTDIR)));
+ m_oaTmpFile[i1] = File.createTempFile(sFile, TMP_SUFFIX, oDir);
+ String sCpySuffix = oaPout[i1].getAttr(PARMS_OUTSFX);
+ m_oaOutFile[i1] = new File(oDir,getInputFile().getName()+sCpySuffix);
+ }
+ return;
+ } //__________________________________
+
+ @Override
+ public Serializable getOkNotification()
+ {
+ return "File "+getInputFile()+" successfully copied to all destinations";
+ } //__________________________________
+
+ @Override
+ public Serializable getErrorNotification()
+ {
+ return "Problems copying "+getInputFile()+" to configured destinations";
+ } //__________________________________
+
+} //____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -24,345 +24,138 @@
package org.jboss.soa.esb.listeners;
import java.util.*;
+import java.lang.reflect.Constructor;
import org.apache.log4j.*;
-import javax.naming.*;
-import javax.jms.*;
-
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.actions.AbstractAction;
import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.processors.AbstractProcessor;
-public abstract class AbstractPoller
+public abstract class AbstractPoller implements Runnable, Observer
{
+ protected abstract List<Object> pollForCandidates();
+ protected abstract Object preProcess (Object p_o) throws Exception;
- protected abstract GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception;
-
- // You can override these three values at constructor time of your
- // derived class after calling super(String)
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,DomElement)
protected int m_iMinPollMillis = 3000 // minimum polling interval
,m_iDfltPollMillis = 20000 // default polling interval
- ,m_iDfltReloadMillis= 180000 // default interval between
- // parameter reloading
+ ,m_iSleepForThreads = 3000 // default sleep if no threads available
+ ,m_iUpperThreadLimit = 10 // just in case - override if you wish
;
- public static final String PARM_ACTION_CLASS = "actionClass";
-
public static final String PARM_POLL_LTCY = "pollLatencySecs";
- public static final String PARM_RELOAD_LTCY = "parmsReloadSecs";
- public static final String PARM_TOPIC_CONN_FACT = "topicConnFactoryClass";
- public static final String PARM_QUIESCE_TOPIC = "quiesceTopic";
- public static final String PARM_QUIESCE_SELECTOR = "quiesceSelector";
+ protected int m_iQthr = 0, m_iMaxThr;
+ protected int m_iPollMillis;
- protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
- protected Map<String,GroupOfChilds> m_omChildPrc
- = new HashMap<String,GroupOfChilds>();
+ protected ThreadGroup m_oThrGrp = null;
- protected ParamRepository m_oParmRepos;
- protected String m_oParmName;
protected Logger m_oLogger;
+ protected GpListener m_oDad;
protected DomElement m_oParms;
- protected boolean m_bEndRequested;
-
- protected TopicConnection m_oTopicConn;
- protected TopicSession m_oSession;
- protected Topic m_oTopic;
- protected TopicSubscriber m_oTopicSubs;
-
-
+ protected Class m_oExecClass;
- protected AbstractPoller(String p_sParamsUid) throws Exception
+ protected AbstractPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
{
- m_oLogger = Util.getDefaultLogger(this.getClass());
-
- m_oParmRepos = ParamRepositoryFactory.getInstance();
- m_oParmName = p_sParamsUid;
+ m_oLogger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ m_oParms = p_oParms.cloneObj();
+ checkParms();
} //__________________________________
-
- protected void runUntilEndRequested() throws Exception
- { while (! m_bEndRequested)
- { try
- { String sMsg = (null == m_oParms)
- ? "Initial Parameter loading" : "Reloading Params";
- m_oLogger.info(formatLogMsg(sMsg));
- m_oParms = DomElement.fromXml(m_oParmRepos.get(m_oParmName));
- }
- catch (Exception e)
- {
- StringBuilder sb = new StringBuilder ("Problems loading params ")
- .append(m_oParmName)
- .append((null==m_oParms)? " exiting..." : "continuing to use cached params")
- ;
- m_oLogger.error(formatLogMsg(sb.toString()));
- if (null==m_oParms)
- throw e;
- }
- quiesceTopicSubscribe();
- executeOneCycle();
- }
- } //__________________________________
-
- private void executeOneCycle() throws Exception
+/**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception - if actionClass not specified or not in classpath
+ * or invalid int values for maxThreads or pollLatencySecs
+ */
+ protected void checkParms() throws Exception
{
- String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
- long lNewLoad = System.currentTimeMillis()
- + ( (null != sAtt)
- ? (1000 * Integer.parseInt(sAtt))
- : m_iDfltReloadMillis
- );
+ String sAtt = GpListener.obtainAtt(m_oParms
+ ,GpListener.PARM_ACTION_CLASS,null);
+ m_oExecClass = GpListener.checkActionClass(sAtt);
+
+ sAtt = GpListener.obtainAtt(m_oParms
+ ,GpListener.PARM_MAX_THREADS,"1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax,m_iUpperThreadLimit);
- DomElement[] oaParms = m_oParms.getAllElemChildren();
+ sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
+ m_iPollMillis = (null==sAtt) ? m_iDfltPollMillis
+ : 1000 * Integer.parseInt(sAtt);
+ if (m_iPollMillis < m_iMinPollMillis)
+ m_iPollMillis = m_iMinPollMillis;
+ } //________________________________
- sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
- long lPollLtcy = (null != sAtt)
- ? (1000 * Integer.parseInt(sAtt))
- : m_iDfltPollMillis;
-
- if (lPollLtcy < m_iMinPollMillis)
- lPollLtcy = m_iMinPollMillis;
-
- boolean bFirst = true;
- while (System.currentTimeMillis() <= lNewLoad)
- {
- for (DomElement oCurr : oaParms)
+ /**
+ * Implementation of Observer interface
+ * <p/> Just count the number of active child threads
+ *
+ */
+ public void update(Observable p_oObs, Object p_oUsrObj)
{
- oneScan(oCurr, bFirst);
- }
- long lSlack = lNewLoad - System.currentTimeMillis();
- if (lSlack < 0)
- {
- break;
- }
- if (waitForQuiesce(Math.min(lSlack, lPollLtcy)))
- { m_bEndRequested = true;
- return;
- }
- bFirst = false;
- }
- } //_________________________________________
+ if (p_oUsrObj instanceof Integer)
+ m_iQthr += ((Integer) p_oUsrObj).intValue();
+ } //________________________________
- protected String formatLogMsg(String p_s)
- { return new StringBuilder("Processor '")
- .append(getClass().getSimpleName()).append("' <")
- .append(p_s).append(">")
- .toString();
- } //__________________________________
-
- private final void quiesceTopicSubscribe() throws JMSException, NamingException
+ /**
+ * Implement run method for this Runnable
+ * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
+ * no more looping allowed for all child classes
+ * <p/> This condition will not prevent child processes to finish normally
+ */
+ public void run()
{
- try
- {
- m_oTopicConn = null;
- m_oSession = null;
- m_oTopic = null;
- m_oTopicSubs = null;
-
- String sStopTopic = m_oParms.getAttr(PARM_QUIESCE_TOPIC);
- if (Util.isNullString(sStopTopic))
- return;
- String sFactClass = m_oParms.getAttr(PARM_TOPIC_CONN_FACT);
- if (Util.isNullString(sFactClass))
- sFactClass = "ConnectionFactory";
+ m_oThrGrp = new ThreadGroup(m_oParms.getName());
+ while (m_oDad.continueLooping())
+ {
+ List <Object> olPending = pollForCandidates();
+ if (olPending.size() < 1)
+ { try { Thread.sleep(m_iPollMillis); }
+ catch (InterruptedException e) { return; }
+ continue;
+ }
- String sJndiType = SystemProperties.getJndiServerType();
- String sJndiURL = SystemProperties.getJndiServerURL();
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
- Object tmp = oJndiCtx.lookup(sFactClass);
- TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+ for (Object oCurr : olPending)
+ {
+ if (m_iQthr >= m_iMaxThr)
+ { m_oLogger.info("Waiting for available threads...(max="
+ +m_iMaxThr+")");
+ try { Thread.sleep(m_iSleepForThreads); }
+ catch (InterruptedException e) {return; }
+ break;
+ }
- m_oTopicConn = tcf.createTopicConnection();
- m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
- m_oSession = m_oTopicConn.createTopicSession
- (false,TopicSession.AUTO_ACKNOWLEDGE);
- m_oTopicConn.start();
+ // give the derived class an opportunity to do something
+ // before processing current object
+ Object oProcess = null;
+ try
+ { if (null==(oProcess = preProcess(oCurr)))
+ continue;
+ }
+ catch (Exception ePre)
+ { m_oLogger.error("preProcess(Object) FAILED",ePre);
+ continue;
+ }
- String sSelector = m_oParms.getAttr(PARM_QUIESCE_SELECTOR);
- if (Util.isNullString(sSelector))
- sSelector = "processor='"+getClass().getSimpleName()+"'";
- m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true);
- }
- catch (Exception e)
- { m_oLogger.error("Problems connecting to JMS. ",e);
- }
-
- } //_________________________________________
-
- protected boolean waitForQuiesce(long p_lMillis) throws Exception
- {
- try
- { boolean bSleep = (null== m_oTopicSubs);
- Object oMsg = (bSleep) ? null : secureReceive(p_lMillis);
-
- if (null!=oMsg)
- { m_oLogger.info("Starting Quiesce of "
- +getClass().getSimpleName());
- return true;
- }
- if (bSleep)
- Thread.sleep(p_lMillis);
- return false;
-
- }
- catch (Exception e)
- { m_oLogger.error("Problems with waitForQuiesce. ",e);
- Thread.sleep(p_lMillis);
- return false;
- }
- } //_________________________________________
-
- private Message secureReceive(long p_lMillis) throws Exception
- {
- while (true)
- try
- { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); }
- catch (JMSException e)
- {
- // put here your recovery code
- return null;
- }
-
- } //_________________________________________
-
-
- protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
- {
- String sPrcName = p_oP.getName();
- if (!m_omChildPrc.containsKey(sPrcName))
- {
- ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
- int iMax = m_oThrGrp.enumerate(oaCh);
-
- ThreadGroup oThG = null;
- for (int i1 = 0; null == oThG && i1 < iMax; i1++)
- { if (m_oThrGrp.getName().equals(sPrcName))
- oThG = oaCh[i1];
- }
- if (null == oThG)
- oThG = new ThreadGroup(sPrcName);
- m_omChildPrc.put(sPrcName, newChildGroup(oThG));
- }
- GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName);
-
- if (null == oChildGrp) return;
- if (p_bFirst)
- oChildGrp.m_bError = false;
-
- try
- {
- oChildGrp.execute(p_oP);
- }
- catch (Exception e)
- {
- oChildGrp.m_bError = true;
- m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e);
- }
- } //_________________________________________
-
- protected String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
- throws Exception
- {
- String sVal = p_oP.getAttr(p_sAtt);
- if (null==sVal)
- sVal = p_sDefault;
- if (Util.isNullString(sVal) && (null==p_sDefault))
- throw new Exception(formatLogMsg("Missing or invalid <"+p_sAtt+"> attribute"));
-
- return sVal;
- } //________________________________
-
- protected abstract class GroupOfChilds implements Observer
- {
- protected abstract void doYourJob(DomElement p_oP) throws Exception;
-
- public static final String PARM_MAX_THREADS = "maxThreads";
-
- protected ThreadGroup m_oThrGrp;
- protected boolean m_bError = false;
-
- protected Class m_oExecClass;
- protected DomElement m_oChParms;
-
- protected int m_iQthr = 0, m_iMaxThr;
- protected StringBuilder m_sb;
- protected int m_iSbIni;
-
- protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
- {
- m_oThrGrp = p_oThrGrp;
- m_sb = new StringBuilder("GroupOfThreads ")
- .append(m_oThrGrp.getName()).append(" : ");
- m_iSbIni = m_sb.length();
- } //________________________________
-
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- {
- updQthreads( ( (Integer) p_oUsrObj).intValue());
- }
- } //________________________________
-
- private synchronized void updQthreads(int p_i)
- {
- m_iQthr += p_i;
- } //________________________________
-
- private void execute(DomElement p_oP) throws Exception
- {
- m_sb.setLength(m_iSbIni);
- if (m_bError)
- {
- m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
- .toString());
- return;
- }
- checkParms(p_oP);
- doYourJob (p_oP);
- } //________________________________
-
- protected void setMaxThreads(DomElement p_oP,int p_iMax)
- {
- String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
- m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
- m_iMaxThr = (m_iMaxThr < 1) ? 1
- : (m_iMaxThr > p_iMax) ? p_iMax
- : m_iMaxThr;
-
- } //________________________________
-
- protected Class checkActionClass(String p_sName) throws Exception
- {
- Class oCls;
- try { oCls = Class.forName(p_sName); }
- catch (ClassNotFoundException e)
- { throw new Exception(formatLogMsg("Class "+p_sName
- +" not found in classpath"));
- }
- try { oCls.getConstructor(new Class[] {DomElement.class}); }
- catch (NoSuchMethodException eN)
- { throw new Exception(formatLogMsg("No appropriate constructor "
- +p_sName+"(DomElement) found for class "));
- }
- return oCls;
- } //_________________________________________
-
- // Very basic checkParms method
- // Remember to call super.checkParms(p_oP) in derived checkParms() methods
- // so your parameters will be cloned
- // and to add REAL parameter checking
- protected void checkParms(DomElement p_oP) throws Exception
- {
- m_sb.setLength(m_iSbIni);
- m_oChParms = p_oP.cloneObj();
- m_oChParms.rmvChildsByName(AbstractProcessor.PARMS_THIS_INSTANCE);
- setMaxThreads(p_oP,10);
- } //________________________________
-
- } //______________________________________________________
-
+ AbstractAction oExec = null;
+ try
+ { Constructor oConst = m_oExecClass
+ .getConstructor(GpListener.getActionClassArgs());
+ oExec = (AbstractAction)oConst
+ .newInstance(new Object[] {m_oParms,oProcess});
+ }
+ catch (Exception e)
+ { m_oLogger.error("Can't instantiate action class",e);
+ break;
+ }
+ // launch an instance of the AbstractAction in a child thread
+ m_iQthr += 1;
+ oExec.addObserver(this);
+ new Thread(oExec).start();
+ }
+ }
+ } //__________________________________
+
} //____________________________________________________________________________
Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,184 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-
-
-package org.jboss.soa.esb.listeners;
-
-import java.io.*;
-import java.lang.reflect.Constructor;
-import java.net.*;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.apache.log4j.Logger;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.processors.*;
-
-public class BetterDirListener extends AbstractPoller
-{
- public static void main(String[] args) throws Exception
- {
- new BetterDirListener(args[0]);
- } //________________________________
-
- public BetterDirListener(String p_sParamsUid) throws Exception
- {
- super(p_sParamsUid);
- m_iDfltReloadMillis = 180000;
- m_iDfltPollMillis = 20000;
- m_iMinPollMillis = 5000;
-// See superclass - It provides ability to request end by subscribing to a Topic
- runUntilEndRequested();
- } //__________________________________
-
- protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
- {
- return new MyChildGroup(pThG);
- } //_________________________________________
-
- private class MyChildGroup extends AbstractPoller.GroupOfChilds
- {
- public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass";
- public static final String PARM_INPUT_DIR = "inputDirURI";
- public static final String PARM_SUFFIX = "inputSuffix";
-
- private File m_oInpDir;
- private FileFilter m_oFFilt;
-
- private MyChildGroup(ThreadGroup p_oThrGrp) throws Exception
- { super(p_oThrGrp);
- } //________________________________
-
- @Override
- protected void doYourJob(DomElement p_oP) throws Exception
- {
- File[] oaF = m_oInpDir.listFiles(m_oFFilt);
-
- for (File oFcurr : oaF)
- {
- if (m_iQthr >= m_iMaxThr)
- {
- m_oLogger.info(m_sb.append("Waiting for available threads").toString());
- Thread.sleep(5000);
- break;
- }
- m_oChParms.rmvChildsByName(AbstractProcessor.PARMS_THIS_INSTANCE);
- DomElement oThisProc = new DomElement(AbstractProcessor.PARMS_THIS_INSTANCE);
- oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName());
- m_oChParms.addElemChild(oThisProc);
-
- new Thread(m_oThrGrp,
- new FileChildProcess(m_oExecClass, this, m_oChParms)).start();
- Thread.sleep(500);
- }
- } //________________________________
-
- protected void checkParms(DomElement p_oP) throws Exception
- {
- super.checkParms(p_oP);
-
- String sAtt = m_oChParms.getAttr(PARM_INPUT_DIR);
- if (null == sAtt)
- { throw new Exception(formatLogMsg(
- m_sb.append("Missing ").append(PARM_INPUT_DIR)
- .append(" attribute in -parameters ")
- .toString()));
- }
- m_oInpDir = new File(new URI(sAtt));
- if (!m_oInpDir.isDirectory())
- { throw new Exception(formatLogMsg(
- m_sb.append(sAtt).append(" is not a directory").toString()));
- }
- if (!m_oInpDir.canRead())
- { throw new Exception(formatLogMsg(
- m_sb.append("Can't read directory ").append(sAtt).
- toString()));
- }
-
- sAtt = m_oChParms.getAttr(PARM_SUFFIX);
- if (null == sAtt)
- { throw new Exception(formatLogMsg(
- m_sb.append("Missing ").append(PARM_SUFFIX)
- .append(" attribute in -parameters ")
- .toString()));
- }
-
- m_oFFilt = new FileEndsWith(sAtt);
-
- sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS);
- m_oExecClass = null;
- if (null == sAtt)
- { throw new Exception(formatLogMsg(
- m_sb.append("Missing fileProcessorClass attribute").
- toString()));
- }
- m_oExecClass = super.checkActionClass(sAtt);
-
- } //________________________________
-
-
- private class FileEndsWith implements FileFilter
- {
- String m_sSuffix;
- FileEndsWith(String p_sEnd) throws Exception
- {
- m_sSuffix = p_sEnd;
- if (Util.isNullString(m_sSuffix))
- throw new Exception("Must specify file extension");
- } //_________________________________________
-
- public boolean accept(File p_f)
- { return (p_f.isFile())
- ? p_f.toString().endsWith(m_sSuffix)
- : false;
- } //_________________________________________
- } //___________________________________________________
- } //______________________________________________________
-
- protected static class FileChildProcess extends Observable implements Runnable
- { private Class m_oExecClass;
- private DomElement m_oParms;
- private Logger m_oLogger;
- public FileChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP)
- { m_oLogger = Util.getDefaultLogger(this.getClass());
- m_oExecClass = p_oExec;
- this.addObserver(p_oObs);
- m_oParms = p_oP;
- setChanged();
- notifyObservers(new Integer(1));
- } //__________________________________
-
- public void run()
- { try
- { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class});
- Object oProc = oCnst.newInstance(new Object[] {m_oParms});
- ((FileProcessor)oProc).execute();
- }
- catch (Exception e) { m_oLogger.error("run() FAILED",e); }
-
- setChanged();
- notifyObservers(new Integer(-1));
- } //__________________________________
- } //______________________________________________________
-
-} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,175 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import org.jboss.soa.esb.util.*;
+import org.jboss.soa.esb.actions.AbstractFileAction;
+import org.jboss.soa.esb.helpers.*;
+
+public class DirectoryPoller extends AbstractPoller
+{
+ public static final String FILE_INPUT_DIR = "inputDirURI";
+ public static final String FILE_INPUT_SFX = "inputSuffix";
+ public static final String FILE_WORK_SFX = "workSuffix";
+ public static final String FILE_ERROR_DIR = "errorDirURI";
+ public static final String FILE_ERROR_SFX = "errorSuffix";
+ public static final String FILE_POST_DIR = "postDirURI";
+ public static final String FILE_POST_SFX = "postSuffix";
+ public static final String FILE_POST_DEL = "postDelete";
+
+ public DirectoryPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+ {
+ super(p_oDad,p_oParms);
+ checkMyParms();
+ } //__________________________________
+
+
+ protected File m_oInpDir ,m_oErrorDir ,m_oPostDir;
+ protected FileFilter m_oFFilt;
+ protected String m_sInpSfx ,m_sWrkSfx ,m_sErrSfx ,m_sPostSfx;
+ protected boolean m_bPostDel;
+
+ /**
+ *
+ * @param p_o Object - Must be a File representing the file that has to be processed
+ * @return Object - an array of 3 Files containing:
+ * <p/>[0] renamed file (workSuffix appended to input file name)
+ * <p/>[1] target file name in case actionClass is unable to complete successfuly
+ * <p/>[2] target file name in case actionClass finishes successfuly
+ */
+ @Override
+ public Object preProcess(Object p_o) throws Exception
+ {
+ if (!(p_o instanceof File))
+ return null;
+ File oF = (File)p_o;
+ File oNameWrk = new File (oF.getParentFile(),oF.getName()+m_sWrkSfx);
+
+
+ if (! oF.renameTo(oNameWrk))
+ return null;
+ AbstractFileAction.Params oCurr = new AbstractFileAction.Params();
+ oCurr.bPostDelete = m_bPostDel;
+ oCurr.oInpF = oF;
+ oCurr.oWrkF = oNameWrk;
+ oCurr.oErrF = new File (m_oErrorDir ,oF.getName()+m_sErrSfx);
+ oCurr.oDoneF = new File (m_oPostDir ,oF.getName()+m_sPostSfx);
+
+ return oCurr;
+ } //________________________________
+
+ @Override
+ protected List<Object> pollForCandidates()
+ {
+ File[] oaF = m_oInpDir.listFiles(m_oFFilt);
+ return Arrays.asList((Object[])oaF);
+ } //________________________________
+
+ protected void checkMyParms() throws Exception
+ {
+ // INPUT directory and suffix (used for FileFilter)
+ String sInpDir = GpListener.obtainAtt(m_oParms,FILE_INPUT_DIR,null);
+ m_oInpDir = new File(new URI(sInpDir));
+ seeIfOkToWorkOnDir(m_oInpDir);
+
+ m_sInpSfx = GpListener.obtainAtt(m_oParms,FILE_INPUT_SFX,null);
+ m_sInpSfx = m_sInpSfx.trim();
+ if (m_sInpSfx.length()<1)
+ throw new Exception ("Invalid "+FILE_INPUT_SFX+" attribute");
+ m_oFFilt = new FileEndsWith(m_sInpSfx);
+
+ // WORK suffix (will rename in input directory)
+ m_sWrkSfx = GpListener.obtainAtt(m_oParms,FILE_WORK_SFX,".esbWork").trim();
+ if (m_sWrkSfx.length()<1)
+ throw new Exception ("Invalid "+FILE_WORK_SFX+" attribute");
+ if (m_sInpSfx.equals(m_sWrkSfx))
+ throw new Exception("Work suffix must differ from input suffix <"+m_sWrkSfx+">");
+
+ // ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
+ String sErrDir = GpListener.obtainAtt(m_oParms,FILE_ERROR_DIR,sInpDir);
+ m_oErrorDir = new File(new URI(sErrDir));
+ seeIfOkToWorkOnDir(m_oErrorDir);
+
+ m_sErrSfx = GpListener.obtainAtt(m_oParms,FILE_ERROR_SFX,".esbError").trim();
+ if (m_sErrSfx.length()<1)
+ throw new Exception ("Invalid "+FILE_ERROR_SFX+" attribute");
+ if (m_oErrorDir.equals(m_oInpDir) && m_sInpSfx.equals(m_sErrSfx))
+ throw new Exception("Error suffix must differ from input suffix <"+m_sErrSfx+">");
+
+
+ // Do users wish to delete files that were processed OK ?
+ String sPostDel = GpListener.obtainAtt(m_oParms,FILE_POST_DEL,"false").trim();
+ m_bPostDel = Boolean.parseBoolean(sPostDel);
+ if (m_bPostDel)
+ return;
+
+ // POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
+ String sPostDir = GpListener.obtainAtt(m_oParms,FILE_POST_DIR,sInpDir);
+ m_oPostDir = new File(new URI(sPostDir));
+ seeIfOkToWorkOnDir(m_oPostDir);
+ m_sPostSfx = GpListener.obtainAtt(m_oParms,FILE_POST_SFX,".esbDone").trim();
+ if (m_oPostDir.equals(m_oInpDir))
+ { if (m_sPostSfx.length()<1)
+ throw new Exception ("Invalid "+FILE_POST_SFX+" attribute");
+ if (m_sPostSfx.equals(m_sInpSfx))
+ throw new Exception("Post process suffix must differ from input suffix <"+m_sPostSfx+">");
+ }
+
+ } //________________________________
+
+ protected void seeIfOkToWorkOnDir (File p_oDir) throws Exception
+ {
+ if (! p_oDir.exists())
+ throw new Exception ("Directory "+p_oDir.toString()+" not found");
+ if (!p_oDir.isDirectory())
+ throw new Exception(p_oDir.toString()+" is not a directory");
+ if (!p_oDir.canRead())
+ throw new Exception("Can't read directory "+p_oDir.toString());
+ if (! p_oDir.canWrite())
+ throw new Exception ("Can't write/rename in directory "+p_oDir.toString());
+ } //________________________________
+
+
+ private class FileEndsWith implements FileFilter
+ {
+ String m_sSuffix;
+ FileEndsWith(String p_sEnd) throws Exception
+ {
+ m_sSuffix = p_sEnd;
+ if (Util.isNullString(m_sSuffix))
+ throw new Exception("Must specify file extension");
+ } //______________________________
+
+ public boolean accept(File p_f)
+ { return (p_f.isFile())
+ ? p_f.toString().endsWith(m_sSuffix)
+ : false;
+ } //______________________________
+ } //____________________________________________________
+
+} //____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -0,0 +1,617 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.lang.reflect.*;
+
+import javax.jms.*;
+import javax.naming.*;
+
+import org.apache.log4j.*;
+
+import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.common.SystemProperties;
+import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.notification.NotificationList;
+import org.jboss.soa.esb.parameters.*;
+import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * Controlling class that will launch listener child threads
+ * for supported transport listener classes, as indicated
+ * in the configuration XML tree pointed by arg[0]
+ *
+ * <p /> Can be launched as uppermost controller (it has a main(args) method)
+ * <p /> Also implements Runnable, and can thus be launched in a child
+ * thread from an upper controlling process
+ * <p /> Listens on a JMS queue (with an optional message selector)
+ * for commands (e.g. Quiesce, Reload Parameters, Set End Time, etc.)
+ * <p /> Parameter reloading can also be set using the PARM_RELOAD_SECS
+ * attribute
+ * <p /> End time for this instance can also be set using the PARM_END_TIME
+ * attribute
+ * <p />
+ *
+ * @author Esteban
+ *
+ */
+public class GpListener implements Runnable
+{
+ public static void main(String[] args) throws Exception
+ {
+ GpListener oProc = new GpListener(args[0]);
+ oProc.run();
+ GpListener.State oS = oProc.getState();
+
+ if (null!=oS.getException())
+ {
+ oProc.m_oLogger.error
+ ("GpListener <"+args[0]+"> FAILED\n",oS.getException());
+ }
+ System.exit(oS.getCompletionCode());
+ } //________________________________
+
+ protected int m_iDfltReloadMillis= 180000 // default interval between parameter reloads
+ ;
+ public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
+ public static final String COMMAND_JNDI_TYPE = "commandJndiType";
+ public static final String COMMAND_JNDI_URL = "commandJndiURL";
+ public static final String COMMAND_IS_TOPIC = "commandIsTopic";
+ public static final String COMMAND_JNDI_NAME = "commandJndiName";
+ public static final String COMMAND_MSG_SELECTOR = "messageSelector";
+ public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
+ public static final String PARM_END_TIME = "endTime";
+
+ // Attribute name that denotes listener class to be instantiated in a child thread
+ // This attribute is not in the root node but in first level child DomElements
+ public static final String PARM_LISTENER_CLASS = "listenerClass";
+
+ public static final String PARM_ACTION_CLASS = "actionClass";
+ public static final String PARM_MAX_THREADS = "maxThreads";
+
+
+ private Logger m_oLogger;
+
+ private ParamRepository m_oParmRepos;
+ private String m_sParmsName;
+ private DomElement m_oParms;
+
+ private HashMap<String,Object> m_oAtts;
+ /**
+ * Obtain a shallow copy of needed atributes in this object's last loaded
+ * parameter tree
+ * <p/>The local bject is cloned so child threads can use it as they choose to
+ * without interfering with the environment
+ * <p /> Listener processes controlled by this object should keep a reference
+ * to this object at construction time, and not call this method again unless
+ * they specifically need updated values. Parameter reload could have happened
+ * since last call
+ *
+ * @return Map - a shallow copy of the attributes Map
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String,Object> getControllerAttributes()
+ { return (Map<String,Object>)m_oAtts.clone(); }
+
+ private boolean m_bReloadRequested ,m_bEndRequested;
+ private long m_lNextReload = Long.MAX_VALUE;
+ private long m_lEndTime = Long.MAX_VALUE;
+ public static final SimpleDateFormat s_oDateParse
+ = new SimpleDateFormat("yyyyMMdd hh:mm:ss");
+
+ private State m_oState = null;
+ public State getState() {return m_oState; }
+ public static enum State
+ {
+ Loading_parameters
+ ,Running
+ ,Shutting_down
+ ,Done_OK
+ ,Exception_thrown
+ ;
+ int m_iCompletionCode = 0;
+ Exception m_oException = null;
+ public int getCompletionCode() { return m_iCompletionCode; };
+ public Exception getException() { return m_oException; }
+ };
+
+ private MessageConsumer m_oCmdSrc;
+ private Session m_oJmsSess;
+ private Connection m_oJmsConn;
+
+ public GpListener(String p_sParameterName) throws Exception
+ { m_oLogger = Logger.getLogger(this.getClass());
+ m_sParmsName = p_sParameterName;
+ m_oParmRepos = ParamRepositoryFactory.getInstance();
+
+ m_oState = State.Loading_parameters;
+ try
+ {
+ String sXml = m_oParmRepos.get(m_sParmsName);
+ m_oParms = DomElement.fromXml(sXml);
+ checkParms(m_oParms);
+ }
+ catch (Exception e)
+ { m_oState = State.Exception_thrown;
+ m_oState.m_oException = e;
+ m_oLogger.fatal("Problems with initial parameter load - <"
+ +p_sParameterName+">",e);
+ throw e;
+ }
+ } //________________________________
+ /**
+ * Check to see if all needed parameters are there, and assign default
+ * values to some of them
+ *
+ * @param p_oP DomElement - Where to look for the mandatory/optional
+ * configuration attributes
+ * @throws Exception - If attributes are wrong or not enough for a proper
+ * runtime configuration
+ */
+ public void checkParms(DomElement p_oP) throws Exception
+ {
+ // We've just loaded - set to false until next reload requested
+ m_bReloadRequested = false;
+ m_oCmdSrc = null;
+
+ Map<String,Object> oNewAtts = new HashMap<String,Object>();
+
+ // Only check for JMS attributes if a queue JNDI name was specified
+ String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+ if (! Util.isNullString(sJndiName))
+ {
+ oNewAtts.put(COMMAND_JNDI_NAME,sJndiName);
+
+ String sJndiType = obtainAtt(p_oP,COMMAND_JNDI_TYPE,"jboss");
+ oNewAtts.put(COMMAND_JNDI_TYPE,sJndiType);
+ String sJndiURL = obtainAtt(p_oP,COMMAND_JNDI_URL,"localhost");
+ oNewAtts.put(COMMAND_JNDI_URL,sJndiURL);
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+ String sFactClass = obtainAtt(p_oP,COMMAND_CONN_FACTORY,"ConnectionFactory");
+ oNewAtts.put(COMMAND_CONN_FACTORY,sFactClass);
+ if (Util.isNullString(sFactClass))
+ sFactClass = "ConnectionFactory";
+ Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+ String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+ if (null!=sMsgSelector)
+ oNewAtts.put(COMMAND_MSG_SELECTOR,sMsgSelector);
+
+ boolean bIsTopic = Boolean.parseBoolean
+ (obtainAtt(p_oP,COMMAND_IS_TOPIC,"false"));
+ if (bIsTopic)
+ {
+ TopicConnectionFactory tcf = (TopicConnectionFactory)oFactCls;
+ TopicConnection oTC = tcf.createTopicConnection();
+ Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+ TopicSession oSess = oTC.createTopicSession
+ (false,TopicSession.AUTO_ACKNOWLEDGE);
+ m_oJmsConn = oTC;
+ m_oJmsSess = oSess;
+ oTC.start();
+ m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector,true);
+ }
+ else
+ {
+ QueueConnectionFactory qcf = (QueueConnectionFactory)oFactCls;
+ QueueConnection oQC = qcf.createQueueConnection();
+ javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx.lookup(sJndiName);
+ QueueSession oSess = oQC.createQueueSession
+ (false,TopicSession.AUTO_ACKNOWLEDGE);
+ oQC.start();
+ m_oJmsConn = oQC;
+ m_oJmsSess = oSess;
+ m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+ }
+ }
+
+ // if PARM_RELOAD_SECS not set, and no command queue
+ // then reload every 10 minutes
+ // If there is a command queue, run until command is received
+ String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
+ m_lNextReload = (null!=sRldSecs)
+ ? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
+ : (null==m_oCmdSrc) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + m_iDfltReloadMillis
+ ;
+
+ // if PARM_END_TIME not set try to run forever
+ // not a good practice if command queue is not set
+ // Expected date format is "yyyyMMdd hh:mm:ss"
+ String sEndT = p_oP.getAttr(PARM_END_TIME);
+ m_lEndTime = (null==sEndT) ? Long.MAX_VALUE
+ : s_oDateParse.parse(sEndT).getTime();
+
+ } //________________________________
+
+/**
+ * Main execution loop
+ * <p/> Will continue to run until either
+ * <p/>a) run time is expired
+ * <p/>b) quiesce command is received in command queue
+ * <p/>For every child element that contains a PARM_LISTENER_CLASS
+ * attribute, this method will try to launch a child thread
+ * instantiating an object of that class, and will call it's run()
+ * method
+ * <p/>Once all child processes are trigered, the main thread
+ * will either
+ * <p/>1) wait for a message in the command queue (if one was
+ * configured) until next reload or end of run period expired
+ * <p/>or 2) Just sleep if there's no command queue to listen on
+ */
+ public void run()
+ {
+ while (endNotRequested())
+ {
+ m_oState = State.Running;
+ for (DomElement oCurr : m_oParms.getAllElemChildren())
+ { String sClass = oCurr.getAttr(PARM_LISTENER_CLASS);
+ if (Util.isNullString(sClass))
+ continue;
+ tryToLaunchChildListener(oCurr,sClass);
+ }
+
+ waitForCmdOrSleep();
+
+ if (endRequested())
+ break;
+ if (timeToReload())
+ try
+ {
+ m_oState = State.Loading_parameters;
+ m_oLogger.info("Reloading parameters _____________________________________________________");
+ DomElement oNew = DomElement.fromXml(m_oParmRepos.get(m_sParmsName));
+ checkParms(oNew);
+ m_oParms = oNew;
+ }
+ catch (Exception e)
+ {
+ m_oLogger.error
+ ("Failed to reload parameters"
+ +" - Continuing with cached version",e);
+ }
+ }
+// m_oState = State.Shutting_down;
+
+ m_oState = State.Done_OK;
+ m_oState.m_iCompletionCode = 0;
+ m_oLogger.info("Finishing_____________________________________________________");
+
+ if (null!=m_oJmsSess)
+ try { m_oJmsSess.close(); }
+ catch (JMSException eS) {/* Tried my best - Just continue */}
+ if (null!=m_oJmsConn)
+ try { m_oJmsConn.close(); }
+ catch (JMSException eC) {/* Tried my best - Just continue */}
+ } //________________________________
+
+ private void tryToLaunchChildListener(DomElement p_oP,String p_sClassName)
+ {
+ try
+ { Class oListener = Class.forName(p_sClassName);
+ Constructor oConst = oListener.getConstructor
+ (new Class[] {this.getClass(),DomElement.class});
+ Runnable oRun = (Runnable)oConst.newInstance(new Object[] {this,p_oP});
+ new Thread(oRun).start();
+ }
+ catch (Exception e)
+ {
+ m_oLogger.error("Cannot launch <"+p_sClassName+">\n",e);
+ }
+ } //________________________________
+
+ long millisToWait()
+ {
+ return Math.min(m_lNextReload,m_lEndTime)
+ - System.currentTimeMillis();
+ } //________________________________
+
+ private void waitForCmdOrSleep()
+ {
+ long lToGo = millisToWait();
+
+ if (null==m_oCmdSrc)
+ {
+ m_oLogger.debug("About to sleep "+lToGo);
+ // No command queue nor topic - Just sleep until time
+ // exhausted, or thread interrupted
+ try { Thread.sleep(lToGo); }
+ catch (InterruptedException e)
+ {
+ m_lEndTime = 0; // mark as end requested and return
+ }
+ return;
+ }
+
+ // Wait for commands until time exhausted or command received
+ // Note that received commands might change time variables (reload/end)
+ // that's why time to go is recalculated on each cycle
+ while ((lToGo = millisToWait()) > 0)
+ {
+ try
+ {
+ m_oLogger.info("Waiting for command ... timeout="+lToGo+" millis");
+ // for the time being, only text messages allowed
+ // THIS WILL CHANGE !!
+ Message oM = m_oCmdSrc.receive(lToGo);
+ if (null==oM)
+ return;
+ if (! (oM instanceof TextMessage))
+ {
+ m_oLogger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+ return;
+ }
+ processCommand((TextMessage)oM);
+ if (endRequested() || timeToReload())
+ break;
+ }
+ catch (JMSException eJ)
+ {
+ m_oLogger.info("receive on command queue failed",eJ);
+ }
+ }
+ } //________________________________
+
+/**
+ * Processes the command that has been received in the command queue (or topic)
+ * <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be changed
+ *
+ * <p/>
+ * <p/><TABLE border="1">
+ * <COLGROUP> <COL width="200"/> <COL width="400"/> </COLGROUP>
+ * <TR> <TD align="center">message text</TD><TD align="center">effect</TD> </TR>
+ * <TR>
+ * <TD>shutdown*</TD>
+ * <TD>End time will be immediately set to 'now' - quiesce process will start - Child threads will be allowed to finish normally</TD>
+ * </TR>
+ * <TR>
+ * <TD>reload param*</TD>
+ * <TD>Parameters will be immediately reloaded, and listener reconfigured with new values</TD>
+ * </TR>
+ * <TR>
+ * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+ * <TD>End time will be set to new value.
+ * If hh:mm:ss is not supplied => end of day assumed (23:59:59)</TD>
+ * </TR>
+ *</TABLE> * startsWith()
+ * <p/>
+ * @param p_oMsg TextMessage - Received in command queue/topic
+ *
+ */
+ private void processCommand (TextMessage p_oMsg)
+ {
+ try
+ {
+ String sTxt = p_oMsg.getText();
+ if (null==sTxt)
+ return;
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown"))
+ { m_bEndRequested = true;
+ m_oLogger.info("Shutdown has been requested");
+ return;
+ }
+ if (sLow.startsWith("reload param"))
+ { m_bReloadRequested = true;
+ m_oLogger.info("Request for parameter reload has been received");
+ return;
+ }
+ String[] sa = sLow.split("\\s+");
+ if (sa.length>1 && "endtime".equals(sa[0]))
+ try
+ { String sDate = sa[1];
+ String sTime =
+ (sa.length<3 || null==sa[2]) ? "23:59:59" : sa[2];
+ Date oEnd = s_oDateParse.parse(sDate+" "+sTime);
+ m_oLogger.info("New end date set to : "+oEnd);
+ m_lEndTime = oEnd.getTime();
+ }
+ catch (Exception eDat)
+ {
+ m_oLogger.info("Problems with endTime command",eDat);
+ }
+ }
+ catch (JMSException eJ)
+ {
+ m_oLogger.info("Problems with command queue",eJ);
+ }
+ } //________________________________
+
+/**
+ * Accessor to determine if execution time is expired or shutdown requested
+ * @return boolean if processing has to stop (all child threads will be allowed to finish)
+ */
+ public boolean endRequested()
+ { return m_bEndRequested
+ || System.currentTimeMillis() >= m_lEndTime;
+ }
+/**
+ * Accessor to determine if execution time is not expired, and no shutdown request received
+ * @return boolean - true if run time has not expired and quiesce
+ * has not been requested
+ */
+ public boolean endNotRequested()
+ { return ! endRequested();
+ }
+
+/**
+ * Provide a common accessor to determine if parameters have to be reloaded
+ * <p/> For child threads this means thread execution has to end
+ * </p> Child processes should only call this method when they are idle
+ * (as opposed to in the middle of executing a unit of work)
+ * @return boolean - true if it's time to reload parameters
+ */
+ public boolean timeToReload()
+ { return m_bReloadRequested
+ || System.currentTimeMillis() >= m_lNextReload;
+ }
+
+/**
+ * Helper accessor for child processes that provides info to determine if
+ * they can continue with yet another execution cycle
+ * @return boolean - true if runtime is not expired and not time yet to reload
+ * parameters
+ */
+ public boolean continueLooping()
+ {
+ return (endNotRequested() && ! timeToReload());
+ } //________________________________
+
+/**
+ * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+ *
+ * @param p_oP DomElement - look for attributes in this Element only
+ * @param p_sAtt String - Name of attribute to find
+ * @param p_sDefault String -default value if requested attribute is not there
+ * @return String - value of attribute, or default value (if null)
+ * @throws Exception - If requested attribute not found and no default value
+ * supplied by invoker
+ */
+ static String obtainAtt
+ (DomElement p_oP, String p_sAtt, String p_sDefault)
+ throws Exception
+ {
+ String sVal = p_oP.getAttr(p_sAtt);
+ if ((null==sVal) && (null==p_sDefault))
+ throw new Exception("Missing or invalid <"+p_sAtt+"> attribute");
+
+ return (null!=sVal) ? sVal : p_sDefault;
+ } //________________________________
+
+ private static Class[] s_oaActionConstr = {DomElement.class, Object.class};
+ public static Class[] getActionClassArgs() { return s_oaActionConstr; }
+
+ /**
+ * Check to see if an object of the class (arg 0) can be instantiated
+ * in this context
+ *
+ * @param p_sName String - class name to instantiate
+ * - Must implement org.jboss.soa.esb.listeners.AbstractActionClass
+ * @return Class -
+ * @throws Exception - if class not found in path or no appropriate constructor
+ */
+ protected static Class checkActionClass(String p_sName) throws Exception
+ {
+ Class oCls;
+ try
+ { oCls = Class.forName(p_sName);
+ }
+ catch (ClassNotFoundException e)
+ { throw new Exception("Class "+p_sName
+ +" not found in classpath");
+ }
+
+ try
+ { oCls.getConstructor(s_oaActionConstr);
+ }
+ catch (NoSuchMethodException eN)
+ { throw new Exception("No appropriate constructor "
+ +p_sName+"(DomElement,Object) found for class ");
+ }
+ try { oCls.asSubclass(AbstractAction.class); }
+ catch (ClassCastException eCC)
+ { throw new Exception("class "+p_sName
+ + " does not extend "+AbstractAction.class.getName());
+ }
+ return oCls;
+ } //_________________________________________
+
+/**
+ * Find child nodes named "NotificationList" that contain an attribute 'type' that
+ * starts with "ok" (case insensitive)
+ * @param p_oP - DomElement to search for "NotificationList" child Elements
+ * @param p_oSer Serializable - Will constitute the body of the notification
+ */
+ public static void notifyOK(DomElement p_oP, Serializable p_oSer)
+ { try
+ {
+ Serializable oNotif = p_oSer;
+ for (DomElement oCurr : p_oP.getElementChildren(NotificationList.ELEMENT))
+ { NotificationList oNL = new NotificationList(oCurr);
+ if (! oNL.isOK()) continue;
+ getNotifHandler().sendNotifications(oCurr,oNotif);
+ }
+ }
+ catch (Exception e) {}
+ } //__________________________________
+
+ /**
+ * Find child nodes named "NotificationList" that contain an attribute 'type' that
+ * starts with "err" (case insensitive) or no 'type' attribute set
+ * @param p_oP - DomElement to search for "NotificationList" child Elements
+ * @param p_e - Exception if not null, will be appended to the body
+ * @param p_oSer Serializable - Will be included at the beginning of the body
+ * of the notification
+ */
+ public static void notifyError(DomElement p_oP,Exception p_e, Serializable p_oSer)
+ {
+ Serializable oNotif = p_oSer;
+ ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+ PrintStream oPS = new PrintStream(oBO);
+ try
+ { oPS.println(oNotif.toString());
+ if (null != p_e) p_e.printStackTrace(oPS);
+ oPS.close();
+
+ String sMsg = oBO.toString();
+ for (DomElement oCurr : p_oP.getElementChildren(NotificationList.ELEMENT))
+ { NotificationList oNL = new NotificationList(oCurr);
+ if (! oNL.isErr()) continue;
+ getNotifHandler().sendNotifications(oNL,sMsg);
+ }
+ }
+ catch (Exception e) { }
+ } //________________________________
+
+ private static InotificationHandler s_oNH;
+ private static final Object s_oSync = new Integer(0);
+/**
+ * Lazy instantiator of a InotificationHandler
+ * @return - a reference to an implementation of the interface or null if it
+ * can't be instantiated
+ */
+ protected static InotificationHandler getNotifHandler()
+ {
+ if (null!=s_oNH)
+ return s_oNH;
+ synchronized (s_oSync)
+ { if (null==s_oNH)
+ try { s_oNH = NotificationHandlerFactory.getNotifHandler
+ ("remote"
+ ,SystemProperties.getJndiServerType()
+ ,SystemProperties.getJndiServerURL()
+ );
+ }
+ catch (Exception e)
+ { Logger.getLogger(GpListener.class).error("Notification FAILED",e);
+ }
+ }
+ return s_oNH;
+ } //______________________________
+
+} //____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,9 +1,28 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
package org.jboss.soa.esb.listeners;
-import java.io.*;
import java.lang.reflect.*;
-import java.util.Map;
-import java.util.HashMap;
import java.util.Observer;
import java.util.Observable;
@@ -12,512 +31,171 @@
import javax.naming.*;
import javax.jms.*;
-import org.jboss.soa.esb.services.*;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.actions.AbstractAction;
import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.notification.*;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.processors.*;
-public class JmsQueueListener
+public class JmsQueueListener implements Runnable, Observer
{
- // You can override this value at constructor time of your
- // derived class after calling super(String)
- protected int m_iDfltReloadMillis= 180000 // default interval between
- // parameter reloading
+ // You can override these values at constructor time of your derived class
+ protected int
+ m_iSleepForThreads = 3000 // default sleep if no threads available
+ ,m_iUpperThreadLimit = 10 // just in case - override if you wish
;
-
- //attributes that need to be set in the xml parameter file for the service
- public static final String PARM_ACTION_CLASS = "actionClass";
-
- public static final String PARM_RELOAD_LTCY = "parmsReloadSecs";
-
- public static final String PARM_TOPIC_CONN_FACT = "topicConnFactoryClass";
- public static final String PARM_QUIESCE_TOPIC = "quiesceTopic";
- public static final String PARM_QUIESCE_SELECTOR = "quiesceSelector";
-
public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
+ public static final String LISTEN_JNDI_TYPE = "listenJndiType";
+ public static final String LISTEN_JNDI_URL = "listenJndiURL";
public static final String LISTEN_QUEUE = "listenQueue";
public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
- protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
- protected Map<String,GroupOfChilds> m_omChildPrc
- = new HashMap<String,GroupOfChilds>();
+ protected boolean m_bError = false;
+
+ protected QueueConnection m_oQconn;
+ protected QueueSession m_oQsess;
+ protected Queue m_oQueue;
+ protected String m_sSelector;
+ protected MessageConsumer m_oRdr;
+
+
+ protected int m_iQthr = 0, m_iMaxThr;
+
+ protected ThreadGroup m_oThrGrp = null;
- protected ParamRepository m_oParmRepos;
- protected String m_oParmName;
protected Logger m_oLogger;
+ protected GpListener m_oDad;
protected DomElement m_oParms;
- protected boolean m_bEndRequested;
-
- protected TopicConnection m_oTopicConn;
- protected TopicSession m_oSession;
- protected Topic m_oTopic;
- protected TopicSubscriber m_oTopicSubs;
-
- protected long m_lNextReload;
-
-
+ protected Class m_oExecClass;
- public JmsQueueListener(String p_sParamsUid) throws Exception
+ public JmsQueueListener(GpListener p_oDad, DomElement p_oParms) throws Exception
{
- m_oLogger = Util.getDefaultLogger(this.getClass());
-
- m_oParmRepos = ParamRepositoryFactory.getInstance();
- m_oParmName = p_sParamsUid;
- runUntilEndRequested();
+ m_oLogger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ m_oParms = p_oParms.cloneObj();
+ checkMyParms();
+ m_oThrGrp = new ThreadGroup(m_oParms.getName());
} //__________________________________
- protected void runUntilEndRequested() throws Exception
- { while (! m_bEndRequested)
- { try
- { String sMsg = (null == m_oParms)
- ? "Initial Parameter loading" : "Reloading Params";
- m_oLogger.debug(formatLogMsg(sMsg));
- m_oParms = DomElement.fromXml(m_oParmRepos.get(m_oParmName));
- }
- catch (Exception e)
- {
- StringBuilder sb = new StringBuilder ("Problems loading params ")
- .append(m_oParmName)
- .append((null==m_oParms)? " exiting..." : "continuing to use cached params")
- ;
- m_oLogger.error(formatLogMsg(sb.toString()));
- if (null==m_oParms)
- throw e;
- }
- quiesceTopicSubscribe();
- executeOneCycle();
- }
- } //__________________________________
-
- private void executeOneCycle() throws Exception
- {
- String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
- m_lNextReload = System.currentTimeMillis()
- + ( (null != sAtt)
- ? (1000 * Integer.parseInt(sAtt))
- : m_iDfltReloadMillis
- );
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception - if mandatory atts are not right
+ * or actionClass not in classpath
+ */
+ protected void checkMyParms() throws Exception
+ {
+ String sAtt = GpListener.obtainAtt(m_oParms
+ ,GpListener.PARM_ACTION_CLASS,null);
+ m_oExecClass= GpListener.checkActionClass(sAtt);
+
+ sAtt = GpListener.obtainAtt(m_oParms
+ ,GpListener.PARM_MAX_THREADS,"1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax,m_iUpperThreadLimit);
- DomElement[] oaParms = m_oParms.getAllElemChildren();
+ // Third arg is null - Exception will br thrown if listenQueue is not found
+ String sQueue = GpListener.obtainAtt(m_oParms,LISTEN_QUEUE,null);
+
+ // Third arg is null - Exception will br thrown if actionClass is not found
+ String sAction = GpListener.obtainAtt
+ (m_oParms,GpListener.PARM_ACTION_CLASS,null);
+ GpListener.checkActionClass(sAction);
- boolean bFirst = true;
- while (System.currentTimeMillis() <= m_lNextReload)
- {
- for (DomElement oCurr : oaParms)
- {
- oneScan(oCurr, bFirst);
- }
- if (waitForQuiesce(5000))
- { m_bEndRequested = true;
- return;
- }
- bFirst = false;
- }
- } //_________________________________________
+ // No problem if selector is null - everything in queue will be returned
+ m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
- protected String formatLogMsg(String p_s)
- { return new StringBuilder("Processor '")
- .append(getClass().getSimpleName()).append("' <")
- .append(p_s).append(">")
- .toString();
- } //__________________________________
-
- private final void quiesceTopicSubscribe() throws JMSException, NamingException
- {
- try
- {
- m_oTopicConn = null;
- m_oSession = null;
- m_oTopic = null;
- m_oTopicSubs = null;
-
- String sStopTopic = m_oParms.getAttr(PARM_QUIESCE_TOPIC);
- if (Util.isNullString(sStopTopic))
- return;
- String sFactClass = m_oParms.getAttr(PARM_TOPIC_CONN_FACT);
- if (Util.isNullString(sFactClass))
- sFactClass = "ConnectionFactory";
+ m_oQconn = null;
+ m_oQsess = null;
+ m_oQueue = null;
+
+ String sJndiType = GpListener.obtainAtt(m_oParms
+ ,LISTEN_JNDI_TYPE,"jboss");
+ String sJndiURL = GpListener.obtainAtt(m_oParms
+ ,LISTEN_JNDI_URL,"localhost");
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
- String sJndiType = SystemProperties.getJndiServerType();
- String sJndiURL = SystemProperties.getJndiServerURL();
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
- Object tmp = oJndiCtx.lookup(sFactClass);
- TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+ String sFactClass = GpListener.obtainAtt(m_oParms
+ ,LISTEN_QUEUE_CONN_FACT,"ConnectionFactory");
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
- m_oTopicConn = tcf.createTopicConnection();
- m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
- m_oSession = m_oTopicConn.createTopicSession
- (false,TopicSession.AUTO_ACKNOWLEDGE);
- m_oTopicConn.start();
-
- String sSelector = m_oParms.getAttr(PARM_QUIESCE_SELECTOR);
- if (Util.isNullString(sSelector))
- sSelector = "processor='"+getClass().getSimpleName()+"'";
- m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true);
- }
- catch (Exception e)
- { m_oLogger.error("Problems connecting to JMS. ",e);
- }
-
- } //_________________________________________
-
- protected boolean waitForQuiesce(long p_lMillis) throws Exception
- {
- try
- { boolean bSleep = (null== m_oTopicSubs);
- Object oMsg = (bSleep) ? null : secureQuiesceReceive(p_lMillis);
-
- if (null!=oMsg)
- { m_oLogger.info("Starting Quiesce of "
- +getClass().getSimpleName());
- return true;
- }
- if (bSleep)
- Thread.sleep(p_lMillis);
- return false;
-
- }
- catch (Exception e)
- { m_oLogger.error("Problems with waitForQuiesce. ",e);
- Thread.sleep(p_lMillis);
- return false;
- }
- } //_________________________________________
-
- private Message secureQuiesceReceive(long p_lMillis) throws Exception
- {
- while (true)
- try
- { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); }
- catch (JMSException e)
- {
- // put here your recovery code
- return null;
- }
-
- } //_________________________________________
-
-
- protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
- {
- String sPrcName = p_oP.getName();
- if (!m_omChildPrc.containsKey(sPrcName))
- {
- ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
- int iMax = m_oThrGrp.enumerate(oaCh);
-
- ThreadGroup oThG = null;
- for (int i1 = 0; null == oThG && i1 < iMax; i1++)
- { if (m_oThrGrp.getName().equals(sPrcName))
- oThG = oaCh[i1];
- }
- if (null == oThG)
- oThG = new ThreadGroup(sPrcName);
- m_omChildPrc.put(sPrcName, newChildGroup(oThG));
- }
- GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName);
-
- if (null == oChildGrp) return;
- if (p_bFirst)
- oChildGrp.m_bError = false;
-
- try
- {
- oChildGrp.execute(p_oP);
- }
- catch (Exception e)
- {
- oChildGrp.m_bError = true;
- m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e);
- }
- } //_________________________________________
-
- protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
- { return new GroupOfChilds(pThG); }
-
- protected class GroupOfChilds implements Observer
- {
- public static final String PARM_MAX_THREADS = "maxThreads";
-
- protected ThreadGroup m_oThrGrp;
- protected boolean m_bError = false;
-
- protected Class m_oExecClass;
- protected DomElement m_oChParms;
-
- protected int m_iQthr = 0, m_iMaxThr;
- protected StringBuilder m_sb;
- protected int m_iSbIni;
-
- protected QueueConnection m_oQconn;
- protected QueueSession m_oQsess;
- protected Queue m_oQueue;
- protected String m_sSelector;
-
- protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
- {
- m_oThrGrp = p_oThrGrp;
- m_sb = new StringBuilder("GroupOfThreads ")
- .append(m_oThrGrp.getName()).append(" : ");
- m_iSbIni = m_sb.length();
- } //________________________________
-
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- {
- int iAdd = ((Integer) p_oUsrObj).intValue();
- m_iQthr += iAdd;
- }
- } //________________________________
-
- private void execute(DomElement p_oP) throws Exception
- {
- m_sb.setLength(m_iSbIni);
- if (m_bError)
- {
- m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
- .toString());
- return;
- }
- checkParms(p_oP);
- doYourJob (p_oP);
- } //________________________________
-
- protected void setMaxThreads(DomElement p_oP,int p_iMax)
- {
- String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
- m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
- m_iMaxThr = (m_iMaxThr < 1) ? 1
- : (m_iMaxThr > p_iMax) ? p_iMax
- : m_iMaxThr;
-
- } //________________________________
-
- protected Class checkActionClass(String p_sName) throws Exception
- {
- Class oCls;
- try { oCls = Class.forName(p_sName); }
- catch (ClassNotFoundException e)
- { throw new Exception(formatLogMsg("Class "+p_sName
- +" not found in classpath"));
- }
- try { oCls.getConstructor(new Class[] {DomElement.class}); }
- catch (NoSuchMethodException eN)
- { throw new Exception(formatLogMsg("No appropriate constructor "
- +p_sName+"(DomElement) found for class "));
- }
- return oCls;
- } //_________________________________________
-
- protected String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
- throws Exception
- {
- String sVal = p_oP.getAttr(p_sAtt);
- if (null==sVal)
- sVal = p_sDefault;
- if (Util.isNullString(sVal) && (null==p_sDefault))
- throw new Exception(formatLogMsg("Missing or invalid <"+p_sAtt+"> attribute"));
-
- return sVal;
- } //________________________________
-
- protected void checkParms(DomElement p_oP) throws Exception
- {
- m_sb.setLength(m_iSbIni);
- m_oChParms = p_oP.cloneObj();
- m_oChParms.rmvChildsByName(AbstractProcessor.PARMS_THIS_INSTANCE);
- setMaxThreads(p_oP,10);
+ m_oQconn = qcf.createQueueConnection();
+ m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+ m_oQsess = m_oQconn.createQueueSession
+ (false,TopicSession.AUTO_ACKNOWLEDGE);
+ m_oQconn.start();
+ m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
- obtainAtt(p_oP,LISTEN_QUEUE,null);
-
- m_sSelector = obtainAtt(p_oP,LISTEN_MSG_SELECTOR,null);
-
- String sClass = obtainAtt(p_oP,PARM_ACTION_CLASS,null);
- if (Util.isNullString(sClass))
- throw new Exception(formatLogMsg("Missing value for "+PARM_ACTION_CLASS));
- m_oExecClass = Class.forName(sClass);
- Constructor m_oConstructor = m_oExecClass.getConstructor
- (new Class[] {DomElement.class});
- if (null==m_oConstructor)
- throw new Exception ("No constructor "+sClass+"(DomElement) found");
} //________________________________
- protected final void obtainQueue(DomElement p_oP) throws JMSException, NamingException
- {
- try
- {
- m_oQconn = null;
- m_oQsess = null;
- m_oQueue = null;
-
- String sQueue = p_oP.getAttr(LISTEN_QUEUE);
- String sFactClass = p_oP.getAttr(LISTEN_QUEUE_CONN_FACT);
- if (Util.isNullString(sFactClass))
- sFactClass = "ConnectionFactory";
-
- String sJndiType = SystemProperties.getJndiServerType();
- String sJndiURL = SystemProperties.getJndiServerURL();
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
- Object tmp = oJndiCtx.lookup(sFactClass);
- QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
-
- m_oQconn = qcf.createQueueConnection();
- m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
- m_oQsess = m_oQconn.createQueueSession
- (false,TopicSession.AUTO_ACKNOWLEDGE);
- m_oQconn.start();
-
- }
- catch (Exception e)
- { m_oLogger.error("Problems connecting to JMS. ",e);
- }
-
- } //_________________________________________
-
- protected void doYourJob(DomElement p_oP) throws Exception
+ /**
+ * Implement run method for this Runnable
+ * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
+ * no more looping allowed for all child classes
+ * <p/> This condition will not prevent child processes to finish normally
+ */
+ public void run()
{
- obtainQueue(p_oP);
- while(System.currentTimeMillis() < m_lNextReload)
- {
- if (m_iQthr >= m_iMaxThr)
- return;
+ while (m_oDad.continueLooping())
+ {
+ if (m_iQthr >= m_iMaxThr)
+ { m_oLogger.info("Waiting for available threads...");
+ try { Thread.sleep(m_iSleepForThreads); }
+ catch (InterruptedException e) {return; }
+ break;
+ }
+ Message oM = null;
+ try { oM = m_oRdr.receive(m_oDad.millisToWait()); }
+ catch (JMSException oJ)
+ {
+ m_oLogger.error("JMS error on receive",oJ);
+ for (int i1=0; i1<3; i1++)
+ try {checkMyParms(); } // try to reconnect to the queue
+ catch (Exception e)
+ { m_oLogger.error("Reconnecting to Queue",e);
+ try { Thread.sleep(m_iSleepForThreads); }
+ catch (InterruptedException e1)
+ { //Just return
+ return;
+ }
+ }
+ }
+ if (null==oM)
+ continue;
- MsgChildProcess oNew = getMsgChildProcess(this,p_oP);
- new Thread(m_oThrGrp,oNew).start();
- // Wait a little bit, so thread count will be updated
- // at some point in the past, this sleep was indispensable
- // new thread control classes in Java 5 might have solved the problem
- Thread.sleep(500);
- }
+ AbstractAction oExec = null;
+ try
+ { Constructor oConst = m_oExecClass
+ .getConstructor(GpListener.getActionClassArgs());
+ oExec = (AbstractAction)oConst.newInstance
+ (new Object[] {m_oParms,oM});
+ }
+ catch (Exception e)
+ { m_oLogger.error("Can't instantiate action class",e);
+ break;
+ }
+ // invoke the run method of the AbstractAction
+ m_iQthr += 1;
+ oExec.addObserver(this);
+ new Thread(oExec).start();
+ }
if (null!=m_oQsess)
try { m_oQsess.close(); }
- catch (Exception e1) {}
+ catch (Exception e1) {/* Tried my best - Just continue */}
if (null!=m_oQconn)
try { m_oQconn.close(); }
- catch (Exception e2) {}
-
- } //________________________________
-
- protected MsgChildProcess getMsgChildProcess
- (GroupOfChilds pDad, DomElement p_oP) throws Exception
- { return new MsgChildProcess (pDad,p_oP); }
-
- } //______________________________________________________
-
- protected class MsgChildProcess extends Observable implements Runnable
- {
- protected GroupOfChilds m_oParent; // you can always go there for common stuff
- protected DomElement m_oParms;
- protected MsgProcessor m_oExec;
-
- public MsgChildProcess(GroupOfChilds p_oGrp, DomElement p_oParms)
- throws Exception
- {
- m_oLogger.debug("Child "+p_oParms.getName());
- m_oParent = p_oGrp;
- m_oParms = p_oParms;
- this.addObserver(m_oParent);
- setChanged();
- // add 1 to child thread count
- notifyObservers(new Integer(1));
- } //__________________________________
-
- public void run()
- {
- Class[] oaArgs = {DomElement.class};
- MessageConsumer oReader = null;
- try
- {
- oReader = m_oParent.m_oQsess.createReceiver
- (m_oParent.m_oQueue, m_oParent.m_sSelector);
-
- long lSlack;
- while ((lSlack=m_lNextReload-System.currentTimeMillis()) > 0)
- {
- Message oMsg = (null==oReader) ? null
- : oReader.receive(lSlack);
- if (null==oMsg)
- continue;
-
- try
- {
- DomElement oParms = m_oParms.cloneObj();
- Constructor oCns = m_oParent.m_oExecClass.getConstructor(oaArgs);
- Object oInst = oCns.newInstance (new Object[] {oParms});
- m_oExec = (MsgProcessor)oInst;
- m_oExec.processMessage(oMsg);
- notifyOK();
- }
- catch (Exception e)
- { e.printStackTrace();
- notifyError(e);
- }
- }
- }
- catch (JMSException oJ)
- {
- m_oLogger.error(oJ);
- }
-
- setChanged();
- // decrease child thread count in parent group
- notifyObservers(new Integer(-1));
+ catch (Exception e2) {/* Tried my best - Just continue */}
} //______________________________
-
- public void notifyOK()
- { try
- {
- Serializable oNotif = m_oExec.getOkNotification();
- for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
- { NotificationList oNL = new NotificationList(oCurr);
- if (! oNL.isOK()) continue;
- getNotifHandler().sendNotifications(oCurr,oNotif);
- }
- }
- catch (Exception e) {}
- } //__________________________________
-
- public void notifyError(Exception p_e)
- {
- Serializable oNotif = (null==m_oExec)
- ? "No action class instantiated"
- : m_oExec.getErrorNotification();
- ByteArrayOutputStream oBO = new ByteArrayOutputStream();
- PrintStream oPS = new PrintStream(oBO);
- try
- { oPS.println(oNotif.toString());
- if (null != p_e) p_e.printStackTrace(oPS);
- oPS.close();
-
- String sMsg = oBO.toString();
- for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
- { NotificationList oNL = new NotificationList(oCurr);
- if (! oNL.isErr()) continue;
- getNotifHandler().sendNotifications(oNL,sMsg);
- }
- }
- catch (Exception e) { }
- } //__________________________________
-
- protected InotificationHandler getNotifHandler()
- {
- try { return NotificationHandlerFactory.getNotifHandler
- ("remote"
- ,SystemProperties.getJndiServerType()
- ,SystemProperties.getJndiServerURL()
- );
- }
- catch (Exception e)
- { m_oLogger.error(formatLogMsg("Notification FAILED"),e);
- return null;
- }
- } //______________________________
- } //______________________________________________________
+/**
+ * Implementation of Observer interface
+ * <p/> Just count the number of active child threads
+ *
+ */
+ public void update(Observable p_oObs, Object p_oUsrObj)
+ {
+ if (p_oUsrObj instanceof Integer)
+ m_iQthr += ((Integer) p_oUsrObj).intValue();
+ } //________________________________
+
} //____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -24,54 +24,24 @@
package org.jboss.soa.esb.listeners;
import java.util.*;
-import java.io.Serializable;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.*;
import java.sql.*;
import javax.sql.*;
-import org.jboss.soa.esb.services.InotificationHandler;
-import org.jboss.soa.esb.services.NotificationHandlerFactory;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.SystemProperties;
import org.jboss.soa.esb.helpers.*;
import org.jboss.soa.esb.helpers.persist.*;
-import org.jboss.soa.esb.notification.NotificationList;
-import org.jboss.soa.esb.processors.*;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.util.*;
/**
* SqlTablePoller class
*
- * the "main(args)" static method of this class will:
- * 1) load the parameters from the Name supplied in args[0] into a DomElement
- * (See org.jboss.soa.esb.parameters package for options on parameter repositories)
- * 2) for each child element (1st level) of the DomElement, it will try to initiate
- * a Thread group that can start a maximum of simultaneous child threads
- * "maxThreads" supplied in parameters - default=1
- * 3) each thread group will poll a SQL table with parameters defined in the corresponding
- * DomElement (example below has only 1 child element, but you can have several)
- * 4) Execution will orderly finish when a message is received in the "quiesceTopic"
- * with an optional "quiesceSelector" (please see AbstractPoller class constants)
- *
- * The SQL table(s) that is (are) polled should have
+ * The SQL table that is polled should have
* 1) a unique key (see "keyFields" parameter) that will be used to update status
* 2) a column to indicate the "processing status" of this trigger row (see ROW_STATE enum)
- * this column will be updated by the SqlChildProcess.run() method (see internal
- * protected class SqlChildProcess)
*
* Each retrieved row (see OPTIONAL_ATT.whereCondition) should be considered as a trigger
* that is intended to instantiate an object of "actionClass". The new instance will
- * receive the full DomElement (level 1 for each child group), with an added child
- * DomElement (see EsbAbstractProcessor.PARMS_THIS_INSTANCE) containing attributes
- * corresponding to the values of the "selectFields" columns in the row that triggered
- * the "actionClass"
+ * receive the full DomElement (level 1 for each child group)
*
- * The ZZDummyProcessor (in this package) is a trivial actionClass included with the
- * sole purpose of illustrating what the actionClass will receive, and the entry points
- * where users can insert their logic
- *
- * GOOD LUCK !!
- *
* @author Esteban Schifman
*/
public class SqlTablePoller extends AbstractPoller
@@ -80,12 +50,9 @@
*
<DocumentElementName>
<ExampleListenChapter
- pollLatencySecs="20"
- parmsReloadSecs="300"
-
maxThreads="2"
-
- actionClass="org.jboss.soa.esb.listeners.ZZDummyProcessor"
+ listenerClass="org.jboss.soa.esb.listeners.SqlTablePoller"
+ actionClass="org.jboss.soa.esb.actions.DummySqlRowAction"
driver-class="org.postgresql.Driver"
connection-url="jdbc:postgresql://myhost:5432/myDB"
@@ -173,152 +140,131 @@
};
public static final String DEFAULT_STATES = "PWED";
- protected Map<String,String> m_oVals = new HashMap<String,String>();
+ protected Map<String,String> m_oVals = new HashMap<String,String>();
+ protected String[] m_saCols ,m_saKeys;
+ protected String m_sUpdStates;
- /**
- * Main program for SqlTablePoller
- * @param args - String[] First parameter must be path to configuration XML file
- * @throws Exception
- */
- public static void main(String[] args) throws Exception
+ /**
+ * In this constructor you can override default values for the following protected base class values:
+ * <br/>
+ * <p/>m_iMinPollMillis : minimum polling interval (default 3000)
+ * <br/>m_iDfltPollMillis : default polling interval (default 20000)
+ * <br/>m_iSleepForThreads : how long to sleep if all configured threads are in use (default 3000)
+ * <br/>m_iUpperThreadLimit : max number of threads allowed (default 10)
+ * @param p_oDad GpListener - The controlling process
+ * @param p_oParms DomElement - Sub tree that corresponds to this instance
+ * @throws Exception
+ */
+ public SqlTablePoller(GpListener p_oDad, DomElement p_oParms) throws Exception
{
- new SqlTablePoller(args[0]);
- } //________________________________
-
-
- public SqlTablePoller(String p_sParamsUid) throws Exception
- {
- super(p_sParamsUid);
- m_iDfltReloadMillis = 180000;
- m_iDfltPollMillis = 20000;
- m_iMinPollMillis = 5000;
-// See superclass - It provides ability to request end by subscribing to a Topic
- runUntilEndRequested();
+ super(p_oDad,p_oParms);
+ try { checkMyParms(); }
+ catch (Exception e)
+ {
+ m_oLogger.error("checkMyParms() FAILED",e);
+ throw e;
+ }
} //__________________________________
-
- /**
- * Override this method if you wish to extend the SqlChildProcess class
- * (for example with ad-hoc notifications for OK list or Error list)
- * @param p_oParms = DomElement containing attributes of trigger
- * @return the SqlChildProcess to be started
- * @throws Exception
- */
- protected SqlChildProcess getSqlChildProcess
- (SqlPollerChildGroup pDad, DomElement p_oParms) throws Exception
- {
- return new SqlChildProcess(pDad, p_oParms);
- } //________________________________
-
- @Override
- protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
- {
- return new SqlPollerChildGroup(pThG);
- } //__________________________________
- protected class SqlPollerChildGroup extends AbstractPoller.GroupOfChilds
- {
- JdbcCleanConn m_oConn;
- String[] m_saKeys;
-
- protected SqlPollerChildGroup(ThreadGroup p_oThrGrp) throws Exception
- { super(p_oThrGrp);
+ private void checkAndStoreAtt(DomElement p_oP, String p_sName, String p_sDflt)
+ throws Exception
+ {
+ m_oVals.put(p_sName,GpListener.obtainAtt(p_oP,p_sName,p_sDflt));
} //________________________________
- protected void checkAtt(DomElement p_oP,String p_sAtt, String p_sDefault)
- throws Exception
- { m_oVals.put(p_sAtt,obtainAtt(p_oP,p_sAtt,p_sDefault));
- } //________________________________
-
- protected void checkParms(DomElement p_oP) throws Exception
+ protected void checkMyParms() throws Exception
{
- super.checkParms(p_oP);
-
- checkAtt(p_oP,SimpleDataSource.DRIVER ,null);
- checkAtt(p_oP,SimpleDataSource.URL ,null);
- checkAtt(p_oP,SimpleDataSource.USER ,"");
- checkAtt(p_oP,SimpleDataSource.PASSWORD ,"");
+ checkAndStoreAtt(m_oParms,SimpleDataSource.DRIVER ,null);
+ checkAndStoreAtt(m_oParms,SimpleDataSource.URL ,null);
+ checkAndStoreAtt(m_oParms,SimpleDataSource.USER ,"");
+ checkAndStoreAtt(m_oParms,SimpleDataSource.PASSWORD ,"");
for (TABLE_ATT oCurr : TABLE_ATT.values())
- checkAtt(p_oP,oCurr.toString(),null);
+ checkAndStoreAtt(m_oParms,oCurr.toString(),null);
- checkAtt(p_oP,OPTIONAL_ATT.whereCondition.toString(),"");
- checkAtt(p_oP,OPTIONAL_ATT.orderBy.toString(),"");
+ checkAndStoreAtt(m_oParms,OPTIONAL_ATT.whereCondition.toString(),"");
+ checkAndStoreAtt(m_oParms,OPTIONAL_ATT.orderBy.toString(),"");
String sAtt = OPTIONAL_ATT.inProcessVals.toString();
- checkAtt(p_oP,sAtt,DEFAULT_STATES);
- String sStates = m_oVals.get(sAtt);
- if (sStates.length()<4)
- throw new Exception(formatLogMsg
- ("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)"
- ));
+ checkAndStoreAtt(m_oParms,sAtt,DEFAULT_STATES);
+ m_sUpdStates = m_oVals.get(sAtt);
+ if (m_sUpdStates.length()<4)
+ throw new Exception("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)");
- checkAtt(p_oP,PARM_ACTION_CLASS,null);
- m_oExecClass = super.checkActionClass(m_oVals.get(PARM_ACTION_CLASS));
-
StringTokenizer ST = new StringTokenizer
(m_oVals.get(TABLE_ATT.selectFields.toString()),",");
+ m_saCols = new String[ST.countTokens()];
Set<String> oSelFlds = new HashSet<String>();
+ int iCurr = 0;
while (ST.hasMoreElements())
- oSelFlds.add(ST.nextToken().trim());
+ {
+ String sColName = ST.nextToken().trim();
+ m_saCols[iCurr++] = sColName;
+ oSelFlds.add(sColName);
+ }
ST = new StringTokenizer
(m_oVals.get(TABLE_ATT.keyFields.toString()),",");
m_saKeys = new String[ST.countTokens()];
if (m_saKeys.length < 1)
- throw new Exception(formatLogMsg("Empty list of keyFields"));
+ throw new Exception("Empty list of keyFields");
- for (int iCurr = 0; ST.hasMoreTokens(); iCurr++)
- { String sKey = ST.nextToken().trim();
- if (! oSelFlds.contains(sKey))
- throw new Exception(formatLogMsg("Key field <"+ sKey + "> must also be in select list"));
- m_saKeys[iCurr] = sKey;
+ for (iCurr = 0; ST.hasMoreTokens(); iCurr++)
+ { String sKeyCol = ST.nextToken().trim();
+ if (! oSelFlds.contains(sKeyCol))
+ throw new Exception("Key field <"+ sKeyCol + "> must also be in select list");
+ m_saKeys[iCurr] = sKeyCol;
}
} //________________________________
-
+
@Override
- protected void doYourJob(DomElement p_oP) throws Exception
+ protected Object preProcess(Object p_o) throws Exception
{
- for (DomElement oCurr : getTriggers())
- {
- if (m_iQthr >= m_iMaxThr)
- {
- m_oLogger.info(m_sb.append("Waiting for available threads").toString());
- Thread.sleep(5000);
- break;
- }
-
- SqlChildProcess oNew = getSqlChildProcess(this,oCurr);
- new Thread(m_oThrGrp,oNew).start();
- // Wait a little bit, so thread count will be updated
- // at some point in the past, this sleep was indispensable
- // new thread control classes in Java 5 might have solved the problem
- Thread.sleep(500);
- }
-
+ return p_o;
} //________________________________
-
- protected List<DomElement> getTriggers() throws Exception
- {
- JdbcCleanConn oConn = newDbConn();
- String sScan = scanStatement();
- PreparedStatement PS = oConn.prepareStatement(sScan);
- ResultSet RS = oConn.execQueryWait(PS,1);
+ @Override
+ protected List<Object> pollForCandidates()
+ {
+ String sSel4U = selectForUpdStatement();
+ String sUpdStmt = updateStatement();
+ JdbcCleanConn oConn = null;
+ List<Object> oResults = new ArrayList<Object>();
+ try
+ {
+ oConn = newDbConn();
+ String sScan = scanStatement();
- ResultSetMetaData oMeta = RS.getMetaData();
- String[] saColName = new String[oMeta.getColumnCount()];
- for (int i1=0; i1<saColName.length; i1++)
- saColName[i1] = oMeta.getColumnName(1+i1);
-
- List<DomElement> oResults = new ArrayList<DomElement>();
- while (RS.next())
- { DomElement oNew = new DomElement(AbstractProcessor.PARMS_THIS_INSTANCE);
- for (String sCurrCol : saColName)
- oNew.setAttr(sCurrCol,RS.getString(sCurrCol));
- oResults.add(oNew);
+ PreparedStatement PS = oConn.prepareStatement(sScan);
+ ResultSet RS = oConn.execQueryWait(PS,1);
+ while (RS.next())
+ { Map<String,Object> oColVals = new HashMap<String,Object>();
+ int iCurr = 0;
+ for (String sColName : m_saCols)
+ oColVals.put(sColName,RS.getObject(++iCurr));
+
+ // Set up the parameter object for the SqlRowAction
+ AbstractSqlRowAction.Params oActionP = new AbstractSqlRowAction.Params();
+ oActionP.omVals = oColVals;
+ oActionP.sUpdStates = m_sUpdStates;
+ oActionP.saCols = m_saCols;
+ oActionP.saKeys = m_saKeys;
+ oActionP.sSel4Upd = sSel4U;
+ oActionP.sUpdate = sUpdStmt;
+
+ oResults.add(oActionP);
+ }
}
- oConn.release();
+ catch (Exception e)
+ {
+ m_oLogger.warn("Some triggers might not have been returned",e);
+ }
+ finally
+ {
+ if (null!=oConn)
+ oConn.release();
+ }
return oResults;
} //________________________________
@@ -416,187 +362,4 @@
return sb.append(" for update").toString();
} //________________________________
- } //______________________________________________________
-
- /**
- * The child process group will try to start a new thread for each
- * element in the ResultSet returned by the scanStatement()
- * The controlling SqlPollerChildGroup will not start a thread if
- * "maxThreads" is exceeded
- * The idea is to obtain a new instance of the "actionClass" for each
- * row of the resultSet
- * Contents of the table field indicated by the "inProcessField" parameter are used
- * to control concurrency (see enum ROW_STATE for details)
- *
- * @author Esteban
- *
- */
- protected class SqlChildProcess extends Observable implements Runnable
- {
- protected SqlPollerChildGroup m_oParent; // you can always go there for common stuff
- protected DomElement m_oInstP; // values for this instance
- protected JdbcCleanConn m_oConn; // each child has it's own DB connection
- protected PreparedStatement m_PSsel4U ,m_PSupd;
- protected String m_sUpdStates;
- protected String getStatus(ROW_STATE p_oState)
- { int iPos = p_oState.ordinal();
- return m_sUpdStates.substring(iPos,++iPos);
- } //________________________________
-
- public SqlChildProcess(SqlPollerChildGroup p_oGrp, DomElement p_oP)
- throws Exception
- {
- m_oParent = p_oGrp;
- this.addObserver(m_oParent);
- setChanged();
- // add 1 to child thread count
- notifyObservers(new Integer(1));
-
- m_oInstP = p_oP;
- m_oConn = p_oGrp.newDbConn();
- m_sUpdStates = m_oVals.get(OPTIONAL_ATT.inProcessVals.toString());
- } //__________________________________
-
- public void run()
- {
- Exception oAbend = null;
- try
- { m_PSsel4U = m_oConn.prepareStatement(m_oParent.selectForUpdStatement());
- m_PSupd = m_oConn.prepareStatement(m_oParent.updateStatement());
-
- int iParm=1;
- for (String sCurr : m_oParent.m_saKeys)
- { String sVal = m_oInstP.getAttr(sCurr);
- m_PSsel4U.setString (iParm ,sVal);
- // parameters are +1 in update statement
- // autoincrement leaves things ready for next SQL parameter
- m_PSupd.setString (++iParm,sVal);
- }
- // will only continue if it can change status to "Working"
- if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working))
- oAbend = new Exception("Unable to change status to Working");
- }
- catch (Exception e)
- { m_oLogger.error("Problems with update statements",e);
- if (null!=m_oConn)
- { try { m_oConn.rollback(); }
- catch (Exception eR) { /* OK do nothing */}
- m_oConn.release();
- }
- oAbend = e;
- }
-
- if (null==oAbend)
- try
- {
- Constructor oCnst = m_oParent.m_oExecClass
- .getConstructor (new Class[] {DomElement.class});
- DomElement oParms = m_oParent.m_oChParms.cloneObj();
- oParms.addElemChild(m_oInstP);
- Object oInst = oCnst.newInstance (new Object[] {oParms});
- ((AbstractProcessor)oInst).execute();
- changeStatus(ROW_STATE.Working,ROW_STATE.Done);
- }
- catch (Exception e)
- { m_oLogger.error("run() FAILED",e);
- try
- { m_oConn.rollback();
- changeStatus(null,ROW_STATE.Error);
- }
- catch (Exception CS) { /* What could we do here ? */}
- oAbend = e;
- }
-
- finally
- { if (null!=m_oConn)
- m_oConn.release();
- }
-
- if (null==oAbend)
- notifyOK();
- else
- notifyError(oAbend);
-
- setChanged();
- // decrease child thread count in parent group
- notifyObservers(new Integer(-1));
- } //______________________________
-
- private boolean changeStatus (ROW_STATE pFrom, ROW_STATE pTo) throws Exception
- { ResultSet RS = m_oConn.execQueryWait(m_PSsel4U,5);
- if (! RS.next())
- return false;
- if (null!=pFrom)
- { String sOldStatus = RS.getString(1).substring(0,1);
- if (!sOldStatus.equalsIgnoreCase(getStatus(pFrom)))
- { m_oConn.rollback();
- return false;
- }
- }
- m_PSupd.setString(1,getStatus(pTo));
- m_oConn.execUpdWait(m_PSupd,5);
- m_oConn.commit();
-
- return true;
- } //______________________________
-
- public void notifyOK()
- { try
- {
- java.io.Serializable oNotif = getOkNotifContent();
- for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
- { NotificationList oNL = new NotificationList(oCurr);
- if (! oNL.isOK()) continue;
- getNotifHandler().sendNotifications(oCurr,oNotif);
- }
- }
- catch (Exception e) {}
- } //__________________________________
-
- public void notifyError(Exception p_e)
- {
- Serializable oNotif = getErrorNotifContent();
- ByteArrayOutputStream oBO = new ByteArrayOutputStream();
- PrintStream oPS = new PrintStream(oBO);
- try
- { oPS.println(oNotif);
- if (null != p_e) p_e.printStackTrace(oPS);
- oPS.close();
-
- String sMsg = oBO.toString();
- for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
- { NotificationList oNL = new NotificationList(oCurr);
- if (! oNL.isErr()) continue;
- getNotifHandler().sendNotifications(oNL,sMsg);
- }
- }
- catch (Exception e) { }
- } //__________________________________
-
- protected InotificationHandler getNotifHandler()
- {
- try { return NotificationHandlerFactory.getNotifHandler
- ("remote"
- ,SystemProperties.getJndiServerType()
- ,SystemProperties.getJndiServerURL()
- );
- }
- catch (Exception e)
- { m_oLogger.error(formatLogMsg("Notification FAILED"),e);
- return null;
- }
- } //______________________________
-
- // These methods to be overriden by you own derived class
- protected Serializable getOkNotifContent()
- {
- return "Success";
- }
- protected Serializable getErrorNotifContent()
- {
- return "FAILURE";
- }
-
- } //______________________________________________________
-
} //____________________________________________________________________________
Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZDummyProcessor.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,25 +0,0 @@
-package org.jboss.soa.esb.listeners;
-
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.processors.*;
-
-public class ZZDummyProcessor extends AbstractProcessor
-{
- public ZZDummyProcessor(DomElement p_oParms) throws Exception
- { super(p_oParms);
- }
-
- @Override
- public void execute() throws Exception
- {
- System.out.println(getClass().getName()+" "
- + m_oParms.toString()
- );
- }
-
- @Override
- protected void checkParms() throws Exception
- {
- }
-
-} //____________________________________________________________________________
Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java 2006-08-08 02:13:25 UTC (rev 5589)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ZZderivedSqlTablePoller.java 2006-08-08 02:20:20 UTC (rev 5590)
@@ -1,53 +0,0 @@
-package org.jboss.soa.esb.listeners;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.jboss.soa.esb.helpers.*;
-
-public class ZZderivedSqlTablePoller extends SqlTablePoller
-{
- public static void main(String[] args) throws Exception
- { new ZZderivedSqlTablePoller(args[0]); }
-
- public ZZderivedSqlTablePoller(String pParamsUid) throws Exception
- { super(pParamsUid);
- } //________________________________
-
- protected SqlChildProcess getSqlChildProcess
- (SqlPollerChildGroup pDad, DomElement p_oParms) throws Exception
- {
- return new ZZChildProcess(pDad, p_oParms);
- } //________________________________
-
- private static final SimpleDateFormat s_oFmt
- = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.mmm");
-
- protected class ZZChildProcess extends SqlChildProcess
- {
- public ZZChildProcess(SqlPollerChildGroup p_oGrp, DomElement p_oP)
- throws Exception
- { super(p_oGrp,p_oP); }
-
- protected String getMsgPfx()
- { return ZZderivedSqlTablePoller.class.getSimpleName()
- +" "+s_oFmt.format(new Date(System.currentTimeMillis()))+" ";
- }
-
- // object m_oInstP contains parameters unique to this instance
- // object m_oParent.m_oChParms contains parameters common to all child threads
- // of the parent child group
- protected String getOkNotifContent()
- {
- return getMsgPfx()+" OK "+m_oInstP.toString();
- }
-
- protected String getErrorNotifContent()
- {
- return getMsgPfx()+" eeeeerrrrrrrrr "+m_oInstP.toString();
- }
-
- } //__________________________________________________
-
-
-} //____________________________________________________________________________
More information about the jboss-svn-commits
mailing list