[jboss-svn-commits] JBL Code SVN: r6808 - in labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb: . actions message message/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Oct 14 18:07:26 EDT 2006
Author: estebanschifman
Date: 2006-10-14 18:07:21 -0400 (Sat, 14 Oct 2006)
New Revision: 6808
Added:
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java
Modified:
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
Log:
Initial cut of new 'Message aware' listeners and actions
Modified: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java 2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java 2006-10-14 22:07:21 UTC (rev 6808)
@@ -22,6 +22,7 @@
package org.jboss.soa.esb.actions;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.Body;
/**
@@ -31,8 +32,9 @@
*/
public class ActionUtils
{
- public static final String BEFORE_ACTION="org.jboss.soa.esb.actions.current.before";
- public static final String CURRENT_OBJECT="org.jboss.soa.esb.actions.current.after";
+ public static final String BEFORE_ACTION="org.jboss.soa.esb.actions.current.before";
+ private static final String CURRENT_OBJECT="org.jboss.soa.esb.actions.current.after";
+ private static final String CONFIG_TREE ="org.jboss.soa.esb.helpers.ConfigTree";
/**
* Obtain the current object from standard spot within message
@@ -76,4 +78,31 @@
message.getBody().add(CURRENT_OBJECT,obj);
return oRet;
}
+
+ /**
+ * Put a Config tree in predefined spot for it in 'this'
+ * <br/>NULL values are not stored
+ * @param message - to operate on
+ * @param tree - ConfigTree to store - Won't be stored if <null>
+ * @return ConfigTree previously stored
+ */
+ public static ConfigTree setConfigTree(Message message, ConfigTree tree)
+ {
+ ConfigTree oRet = (ConfigTree)message.getBody().remove(CONFIG_TREE);
+ if (null!=tree)
+ message.getBody().add(CONFIG_TREE,tree);
+ return oRet;
+ }
+ /**
+ * Put a Config tree in predefined spot for it in 'this'
+ * <br/>NULL values are not stored
+ * @param message - to operate on
+ * @param tree - ConfigTree to store - Won't be stored if <null>
+ * @return ConfigTree previously stored
+ */
+ public static ConfigTree getConfigTree(Message message)
+ {
+ ConfigTree oRet = (ConfigTree)message.getBody().get(CONFIG_TREE);
+ return (null==oRet)?new ConfigTree("mock"):oRet;
+ }
}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java 2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java 2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,148 @@
+package org.jboss.soa.esb.message.listeners;
+
+import java.lang.reflect.*;
+
+import javax.enterprise.deploy.spi.exceptions.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.jboss.remoting.samples.chat.exceptions.InvalidArgumentException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+
+/**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs a list of action classes on a message
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+class ActionProcessingPipeline implements Runnable
+{
+
+ /**
+ * public constructor
+ * @param message Message - The initial message to be run through the action class chain
+ */
+ public ActionProcessingPipeline(Message message)
+ throws InvalidArgumentException, ConfigurationException
+ {
+ if (null==_message)
+ throw new IllegalArgumentException("Message must be not null");
+ _message = message;
+ _config = ActionUtils.getConfigTree(_message);
+ if (null==_config)
+ throw new IllegalArgumentException("Configuration needed for action classes");
+ _actionList = _config.getChildren("action");
+ if (null==_actionList || _actionList.length<1)
+ throw new ConfigurationException("No ");
+ }
+
+ /**
+ *
+ * @return Message - current Message of this action chain processor
+ */
+ public Message getMessage() { return _message; }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run()
+ {
+ try
+ {
+ // Run the message through each ActionProcessor...
+ _currentIndex = -1;
+ for(ConfigTree oCurr : _actionList)
+ {
+ _currentIndex++;
+ _currentAction = oCurr.getAttribute("class");
+
+ Class oClass = Class.forName(_currentAction);
+ Constructor oConst = oClass.getConstructor(new Class[] {ConfigTree.class});
+ Object currentProcessor = oConst.newInstance(_config);
+ Method method = oClass.getMethod("process",new Class[] {Message.class});
+
+ // The processing result of each action feeds into the processing of the next action...
+ try
+ {
+ // copy currentObject in Message body to 'previous' currentObject
+ ActionUtils.copyCurrentToPrevious(_message);
+ Message next = (Message)method.invoke(currentProcessor,new Object[] {_message} );
+ actionClassFinishedOk(next);
+ if(next==null)
+ {
+ _logger.error(prematureTermination("returned <null> - Cannot continue"));
+ return;
+ }
+ }
+ catch (ClassCastException eCast)
+ {
+ // If action class returns non Message, log and abort chain
+ _logger.error(prematureTermination("returned a non Message Object)"));
+ return;
+ }
+ catch (Exception e)
+ {
+ // If action class threw exception, log and abort chain
+ actionClassException(e);
+ return;
+ }
+ }
+ actionChainFinishedOk();
+ }
+ catch(ClassNotFoundException e)
+ { _logger.error(prematureTermination("action class is not in path"),e); }
+ catch(NoSuchMethodException e)
+ { _logger.error(prematureTermination("method not found"),e); }
+ catch(InstantiationException e)
+ { _logger.error(prematureTermination("cannot instantiate action class"),e); }
+ catch(InvocationTargetException e)
+ { _logger.error(prematureTermination("method not found"),e); }
+ catch(IllegalAccessException e)
+ { _logger.error(prematureTermination("unable to access method"),e); }
+ catch (IllegalArgumentException e) {}
+ }
+
+ protected String prematureTermination(String s)
+ {
+ return new StringBuilder("Premature termination of action processing pipeline ")
+ .append(getActionNames())
+ .append("]. ActionProcessor [").append(_currentAction)
+ .append("] ").append(s)
+ .toString()
+ ;
+ }
+
+ protected void actionClassException(Throwable thr)
+ {
+ thr.printStackTrace();
+ }
+
+ protected void actionClassFinishedOk(Message message)
+ {
+ }
+
+ protected void actionChainFinishedOk()
+ {
+ }
+
+ protected String[] getActionNames()
+ {
+ String[] sa = new String[_actionList.length];
+ int i1=0;
+ for (ConfigTree oCurr : _actionList)
+ sa[i1++]=oCurr.getAttribute("name");
+ return sa;
+ }
+
+ protected ConfigTree[] _actionList;
+ protected int _currentIndex;
+ protected String _currentAction;
+ protected Message _message;
+ protected ConfigTree _config;
+ protected Logger _logger = Logger.getLogger(this.getClass());
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java 2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java 2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,660 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.message.listeners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.notification.NotificationList;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.NotificationHandlerFactory;
+import org.jboss.soa.esb.services.NotificationManager;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+/**
+ * Controlling class that will launch listener child threads for supported
+ * transport listener classes, as indicated in the configuration XML tree
+ * pointed by arg[0]
+ *
+ * <p />
+ * Can be launched as uppermost controller (it has a main(args) method)
+ * <p />
+ * Also implements Runnable, and can thus be launched in a child thread from an
+ * upper controlling process
+ * <p />
+ * Listens on a JMS queue (with an optional message selector) for commands (e.g.
+ * Quiesce, Reload Parameters, Set End Time, etc.)
+ * <p />
+ * Parameter reloading can also be set using the PARM_RELOAD_SECS attribute
+ * <p />
+ * End time for this instance can also be set using the PARM_END_TIME attribute
+ * <p />
+ *
+ * @author Esteban
+ *
+ */
+public class EsbListenerController implements Runnable {
+
+ private static Logger m_oLogger = Logger.getLogger(EsbListenerController.class);
+
+ public static void main(String[] args) throws Exception {
+ EsbListenerController oProc = new EsbListenerController(args[0]);
+ oProc.run();
+ EsbListenerController.State oS = oProc.getState();
+
+ if (null != oS.getException()) {
+ m_oLogger.error("EsbListenerController <" + args[0] + "> FAILED\n", oS
+ .getException());
+ }
+ System.exit(oS.getCompletionCode());
+ } // ________________________________
+
+ protected int m_iDfltReloadMillis = 180000 // default interval between
+ // parameter reloads
+ ;
+
+ public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
+ public static final String PARM_END_TIME = "endTime";
+
+ // Attribute name that denotes listener class to be instantiated in a child
+ // thread
+ // This attribute is not in the root node but in first level child
+ // DomElements
+ public static final String PARM_LISTENER_CLASS = "listenerClass";
+ public static final String PARM_ACTIONS = "actions";
+ public static final String PARM_MAX_THREADS = "maxThreads";
+ public static final String CHLD_EMAIL_PARMS = "EmailProperties";
+
+ private String m_sParmsName;
+ private DomElement m_oParms;
+
+ private HashMap<String, Object> m_oAtts;
+
+ /**
+ * Obtain a shallow copy of needed atributes in this object's last loaded
+ * parameter tree <p/>The local bject is cloned so child threads can use it
+ * as they choose to without interfering with the environment
+ * <p />
+ * Listener processes controlled by this object should keep a reference to
+ * this object at construction time, and not call this method again unless
+ * they specifically need updated values. Parameter reload could have
+ * happened since last call
+ *
+ * @return Map - a shallow copy of the attributes Map
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getControllerAttributes() {
+ return (Map<String, Object>) m_oAtts.clone();
+ }
+
+ private boolean m_bReloadRequested, m_bEndRequested;
+
+ private long m_lNextReload = Long.MAX_VALUE;
+
+ private long m_lEndTime = Long.MAX_VALUE;
+
+ public static final SimpleDateFormat s_oDateParse = new SimpleDateFormat(
+ "yyyyMMdd hh:mm:ss");
+
+ private State m_oState = null;
+
+ public State getState() {
+ return m_oState;
+ }
+
+ public static enum State {
+ Loading_parameters, Running, Shutting_down, Done_OK, Exception_thrown;
+ int m_iCompletionCode = 0;
+
+ Exception m_oException = null;
+
+ public int getCompletionCode() {
+ return m_iCompletionCode;
+ };
+
+ public Exception getException() {
+ return m_oException;
+ }
+ };
+
+ private CommandQueue commandQueue;
+
+ private ActionDefinitionFactory actionDefinitionFactory;
+
+ private static CommandQueue defaultCommandQueue = null;
+
+ /**
+ * Package pivate default constructor.
+ */
+ protected EsbListenerController() {
+ }
+
+ /**
+ * Construct a Listener Manager from the named repository based
+ * configuration.
+ *
+ * @param p_sParameterName
+ * Name of the Repository entry containing the configuration.
+ * @throws Exception
+ * Unable to load/use the named configuration.
+ */
+ public EsbListenerController(String p_sParameterName) throws Exception {
+ this(EsbListenerController.getListenerConfig(p_sParameterName));
+ m_sParmsName = p_sParameterName;
+ }
+
+ /**
+ * Construct a Listener Manager using the specified listener configuration.
+ *
+ * @param config
+ * The configuration.
+ * @throws Exception
+ * Unable to load/use the supplied configuration.
+ */
+ public EsbListenerController(DomElement config) throws Exception {
+ m_oParms = config;
+ m_oState = State.Loading_parameters;
+
+ try {
+ checkParms(m_oParms);
+ setEmailSystemProperties();
+ } catch (Exception e) {
+ String configSource = config.getAttr("configSource");
+
+ m_oState = State.Exception_thrown;
+ m_oState.m_oException = e;
+ m_oLogger.fatal("Listener configuration and startup error. Config Source: "
+ + (configSource != null ? configSource
+ : "unknown"), e);
+
+ throw e;
+ }
+ }
+
+ /**
+ * Load the named listener configuration from the configured parameter
+ * repository.
+ *
+ * @param reposParam
+ * The name of the repository entry containing the Listener
+ * configuration.
+ * @return Listener Configuration as {@link DomElement}.
+ * @throws IOException
+ * Unable to access the repository.
+ * @throws ParamRepositoryException
+ * Unable to access the configuration in the repository.
+ * @throws SAXException
+ * Unable to parse the configuration.
+ */
+ private static DomElement getListenerConfig(String reposParam)
+ throws IOException, ParamRepositoryException, SAXException {
+ String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+ DomElement config = DomElement.fromXml(sXml);
+
+ config.setAttr("configSource", "param-repository:" + reposParam);
+
+ return config;
+ }
+
+ /**
+ * Check to see if all needed parameters are there, and assign default
+ * values to some of them
+ *
+ * @param p_oP
+ * DomElement - Where to look for the mandatory/optional
+ * configuration attributes
+ * @throws Exception -
+ * If attributes are wrong or not enough for a proper runtime
+ * configuration
+ */
+ public void checkParms(DomElement p_oP) throws Exception {
+ // We've just loaded - set to false until next reload requested
+ m_bReloadRequested = false;
+ commandQueue = createCommandQueue(p_oP);
+
+ // Open the command queue...
+ if (null!=commandQueue)
+ commandQueue.open(p_oP);
+
+ // if PARM_RELOAD_SECS not set, and no command queue
+ // then reload every 10 minutes
+ // If there is a command queue, run until command is received
+ String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
+ m_lNextReload = (null != sRldSecs)
+ ? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
+ : (null == commandQueue)
+ ? Long.MAX_VALUE
+ : System.currentTimeMillis() + m_iDfltReloadMillis;
+
+ // if PARM_END_TIME not set try to run forever
+ // not a good practice if command queue is not set
+ // Expected date format is "yyyyMMdd hh:mm:ss"
+ String sEndT = p_oP.getAttr(PARM_END_TIME);
+ m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
+ sEndT).getTime();
+
+ // Read and initialise the action definitions...
+ DomElement actionConfig = p_oP.getFirstElementChild("Actions");
+ if(actionConfig == null) {
+ throw new ConfigurationException("No 'Actions' configuration.");
+ }
+ actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
+
+ } // ________________________________
+
+ /**
+ * Factory method for creating the command queue.
+ * @param config EsbListenerController config.
+ * @return EsbListenerController CommandQueue instance.
+ */
+ private CommandQueue createCommandQueue(DomElement config) {
+ String commandQueueClass = config.getAttr("command-queue-class");
+
+ if(commandQueueClass != null) {
+ try {
+ return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+ } catch (Exception e) {
+ m_oLogger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
+ }
+ }
+
+ return defaultCommandQueue;
+ }
+
+ /**
+ * Allows a default command queue to be set statically for all EsbListenerController instances.
+ * @param defaultCommandQueue The defaultCommandQueue to set.
+ */
+ public static void setDefaultCommandQueue(CommandQueue defaultCommandQueue) {
+ EsbListenerController.defaultCommandQueue = defaultCommandQueue;
+ }
+
+ /**
+ * Main execution loop <p/> Will continue to run until either <p/>a) run
+ * time is expired <p/>b) quiesce command is received in command queue
+ * <p/>For every child element that contains a PARM_LISTENER_CLASS
+ * attribute, this method will try to launch a child thread instantiating an
+ * object of that class, and will call it's run() method <p/>Once all child
+ * processes are trigered, the main thread will either <p/>1) wait for a
+ * message in the command queue (if one was configured) until next reload or
+ * end of run period expired <p/>or 2) Just sleep if there's no command
+ * queue to listen on
+ */
+ public void run() {
+ while (endNotRequested()) {
+ m_oState = State.Running;
+ for (DomElement oCurr : m_oParms.getAllElemChildren()) {
+ String sClass = oCurr.getAttr(PARM_LISTENER_CLASS);
+ if (Util.isNullString(sClass))
+ continue;
+ tryToLaunchChildListener(oCurr, sClass);
+ }
+
+ waitForCmdOrSleep();
+
+ if (endRequested()) {
+ break;
+ }
+ if (m_sParmsName != null && timeToReload()) {
+ try {
+ m_oState = State.Loading_parameters;
+ m_oLogger
+ .info("Reloading parameters _____________________________________________________");
+ DomElement oNew = EsbListenerController.getListenerConfig(m_sParmsName);
+ checkParms(oNew);
+ m_oParms = oNew;
+ setEmailSystemProperties();
+ } catch (Exception e) {
+ m_oLogger.error("Failed to reload parameters"
+ + " - Continuing with cached version", e);
+ }
+ }
+ }
+ // m_oState = State.Shutting_down;
+
+ m_oState = State.Done_OK;
+ m_oState.m_iCompletionCode = 0;
+ m_oLogger
+ .info("Finishing_____________________________________________________");
+
+ // Close the command queue...
+ try {
+ commandQueue.close();
+ } catch (CommandQueueException e) {
+ m_oLogger.error("Error closing Command Queue.", e);
+ }
+ } // ________________________________
+
+ private void tryToLaunchChildListener(DomElement p_oP, String p_sClassName) {
+ try {
+ Class oListener = Class.forName(p_sClassName);
+ Constructor oConst = oListener.getConstructor(new Class[] {
+ this.getClass(), DomElement.class, ActionDefinitionFactory.class });
+ Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
+ p_oP, actionDefinitionFactory });
+ new Thread(oRun).start();
+ } catch (Exception e) {
+ m_oLogger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ }
+ } // ________________________________
+
+ long millisToWait() {
+ return Math.min(m_lNextReload, m_lEndTime) - System.currentTimeMillis();
+ } // ________________________________
+
+ private void waitForCmdOrSleep() {
+ long lToGo = millisToWait();
+
+ if (null == commandQueue) {
+ m_oLogger.debug("About to sleep " + lToGo);
+ // No command queue nor topic - Just sleep until time
+ // exhausted, or thread interrupted
+ try {
+ if (lToGo > 0)
+ Thread.sleep(lToGo);
+ } catch (InterruptedException e) {
+ m_lEndTime = 0; // mark as end requested and return
+ }
+ return;
+ }
+
+ // Wait for commands until time exhausted or command received
+ // Note that received commands might change time variables (reload/end)
+ // that's why time to go is recalculated on each cycle
+ while ((lToGo = millisToWait()) > 0) {
+ try {
+ m_oLogger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+ String oM = commandQueue.receiveCommand(lToGo);
+ if (null == oM) {
+ return;
+ }
+ processCommand(oM);
+ if (endRequested() || timeToReload()) {
+ break;
+ }
+ } catch (CommandQueueException eJ) {
+ m_oLogger.info("receive on command queue failed", eJ);
+ }
+ }
+ } // ________________________________
+
+ /**
+ * Processes the command that has been received in the command queue (or
+ * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
+ * changed
+ *
+ * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+ * width="400"/> </COLGROUP>
+ * <TR>
+ * <TD align="center">message text</TD>
+ * <TD align="center">effect</TD>
+ * </TR>
+ * <TR>
+ * <TD>shutdown*</TD>
+ * <TD>End time will be immediately set to 'now' - quiesce process will
+ * start - Child threads will be allowed to finish normally</TD>
+ * </TR>
+ * <TR>
+ * <TD>reload param*</TD>
+ * <TD>Parameters will be immediately reloaded, and listener reconfigured
+ * with new values</TD>
+ * </TR>
+ * <TR>
+ * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+ * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+ * end of day assumed (23:59:59)</TD>
+ * </TR>
+ * </TABLE> * startsWith() <p/>
+ *
+ * @param p_oMsg
+ * Message received from the command queue.
+ *
+ */
+ private void processCommand(String sTxt) {
+ if (null == sTxt)
+ return;
+
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown")) {
+ m_bEndRequested = true;
+ m_oLogger.info("Shutdown has been requested");
+ return;
+ }
+ if (sLow.startsWith("reload param")) {
+ m_bReloadRequested = true;
+ m_oLogger
+ .info("Request for parameter reload has been received");
+ return;
+ }
+ String[] sa = sLow.split("\\s+");
+ if (sa.length > 1 && "endtime".equals(sa[0])) {
+ try {
+ String sDate = sa[1];
+ String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+ : sa[2];
+ Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+ m_oLogger.info("New end date set to : " + oEnd);
+ m_lEndTime = oEnd.getTime();
+ } catch (Exception eDat) {
+ m_oLogger.info("Problems with endTime command", eDat);
+ }
+ }
+ } // ________________________________
+
+ /**
+ * Accessor to determine if execution time is expired or shutdown requested
+ *
+ * @return boolean if processing has to stop (all child threads will be
+ * allowed to finish)
+ */
+ public boolean endRequested() {
+ return m_bEndRequested || System.currentTimeMillis() >= m_lEndTime;
+ }
+
+ /**
+ * Accessor to determine if execution time is not expired, and no shutdown
+ * request received
+ *
+ * @return boolean - true if run time has not expired and quiesce has not
+ * been requested
+ */
+ public boolean endNotRequested() {
+ return !endRequested();
+ }
+
+ /**
+ * Provide a common accessor to determine if parameters have to be reloaded
+ * <p/> For child threads this means thread execution has to end
+ * </p>
+ * Child processes should only call this method when they are idle (as
+ * opposed to in the middle of executing a unit of work)
+ *
+ * @return boolean - true if it's time to reload parameters
+ */
+ public boolean timeToReload() {
+ return m_bReloadRequested
+ || System.currentTimeMillis() >= m_lNextReload;
+ }
+
+ /**
+ * Helper accessor for child processes that provides info to determine if
+ * they can continue with yet another execution cycle
+ *
+ * @return boolean - true if runtime is not expired and not time yet to
+ * reload parameters
+ */
+ public boolean continueLooping() {
+ return (endNotRequested() && !timeToReload());
+ } // ________________________________
+
+ private static final String[] s_saMailProps = { Environment.SMTP_HOST,
+ Environment.SMTP_USERNAME, Environment.SMTP_PASSWORD,
+ Environment.SMTP_PORT, Environment.SMTP_FROM,
+ Environment.SMTP_AUTH };
+
+ private void setEmailSystemProperties() {
+ DomElement oEmail = m_oParms.getFirstElementChild(CHLD_EMAIL_PARMS);
+ if (null != oEmail)
+ for (String sCurr : s_saMailProps) {
+ String sProp = oEmail.getAttr(sCurr);
+ if (null != sProp)
+ ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).setProperty(sCurr, sProp);
+ }
+ } // ________________________________
+
+ /**
+ * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+ *
+ * @param p_oP
+ * DomElement - look for attributes in this Element only
+ * @param p_sAtt
+ * String - Name of attribute to find
+ * @param p_sDefault
+ * String -default value if requested attribute is not there
+ * @return String - value of attribute, or default value (if null)
+ * @throws Exception -
+ * If requested attribute not found and no default value
+ * supplied by invoker
+ */
+ public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+ throws ConfigurationException {
+ String sVal = p_oP.getAttr(p_sAtt);
+ if ((null == sVal) && (null == p_sDefault))
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+
+ /**
+ * Find child nodes named "NotificationList" that contain an attribute
+ * 'type' that starts with "ok" (case insensitive)
+ *
+ * @param p_oP -
+ * DomElement to search for "NotificationList" child Elements
+ * @param p_oSer
+ * Serializable - Will constitute the body of the notification
+ */
+ public static void notifyOK(DomElement p_oP, Serializable p_oSer) {
+ if(p_oSer == null) {
+ return;
+ }
+
+ try {
+ Serializable oNotif = p_oSer;
+ for (DomElement oCurr : p_oP
+ .getElementChildren(NotificationList.ELEMENT)) {
+ NotificationList oNL = new NotificationList(oCurr);
+ if (!oNL.isOK())
+ continue;
+ getNotifHandler().sendNotifications(oCurr, oNotif);
+ }
+ } catch (Exception e) {
+ }
+ } // __________________________________
+
+ /**
+ * Find child nodes named "NotificationList" that contain an attribute
+ * 'type' that starts with "err" (case insensitive) or no 'type' attribute
+ * set
+ *
+ * @param p_oP -
+ * DomElement to search for "NotificationList" child Elements
+ * @param p_e -
+ * Exception if not null, will be appended to the body
+ * @param p_oSer
+ * Serializable - Will be included at the beginning of the body
+ * of the notification
+ */
+ public static void notifyError(DomElement p_oP, Exception p_e, Serializable p_oSer) {
+ if(p_oSer == null) {
+ return;
+ }
+
+ Serializable oNotif = p_oSer;
+ ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+ PrintStream oPS = new PrintStream(oBO);
+ try {
+ oPS.println(oNotif.toString());
+ if (null != p_e)
+ p_e.printStackTrace(oPS);
+ oPS.close();
+
+ String sMsg = oBO.toString();
+ for (DomElement oCurr : p_oP
+ .getElementChildren(NotificationList.ELEMENT)) {
+ NotificationList oNL = new NotificationList(oCurr);
+ if (!oNL.isErr())
+ continue;
+ getNotifHandler().sendNotifications(oNL, sMsg);
+ }
+ } catch (Exception e) {
+ }
+ } // ________________________________
+
+ private static NotificationManager s_oNH;
+
+ private static final Object s_oSync = new Integer(0);
+
+ /**
+ * Lazy instantiator of a InotificationHandler
+ *
+ * @return - a reference to an implementation of the interface or null if it
+ * can't be instantiated
+ */
+ protected static NotificationManager getNotifHandler() {
+ if (null != s_oNH)
+ return s_oNH;
+ synchronized (s_oSync) {
+ if (null == s_oNH)
+ try {
+ s_oNH = NotificationHandlerFactory.getNotifHandler(
+ "remote", Configuration.getJndiServerType(),
+ Configuration.getJndiServerURL());
+ } catch (Exception e) {
+ Logger.getLogger(EsbListenerController.class).error(
+ "Notification FAILED", e);
+ }
+ }
+ return s_oNH;
+ } // ______________________________
+
+} // ____________________________________________________________________________
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java 2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java 2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.message.listeners;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.addressing.helpers.JMSEpr;
+
+public class JmsQueueListener
+{
+ public JmsQueueListener(EsbListenerController controller, ConfigTree config)
+ throws Exception
+ {
+ _controller = controller;
+ _config = config;
+ _iSleepForThreads = 3;
+ checkMyParms();
+ } // __________________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws Exception
+ {
+ // Third arg is null - Exception will br thrown if listenQueue is not found
+ String sQueue = obtainAttribute (JMSEpr.DESTINATION_NAME_TAG, null);
+
+ // No problem if selector is null - everything in queue will be returned
+ _sSelector = _config.getAttribute(ListenerPropertyNames.MESSAGE_SELECTOR);
+
+ _oQconn = null;
+ _oQsess = null;
+ _oQueue = null;
+
+ String sJndiType = obtainAttribute (ListenerPropertyNames.JNDI_TYPE ,"jboss");
+ String sJndiURL = obtainAttribute (ListenerPropertyNames.JNDI_URL ,"localhost");
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+ String sFactClass = obtainAttribute(JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ _oQconn = qcf.createQueueConnection();
+ _oQueue = (Queue) oJndiCtx.lookup(sQueue);
+ _oQsess = _oQconn.createQueueSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ _oQconn.start();
+ _receiver = _oQsess.createReceiver(_oQueue, _sSelector);
+
+ } // ________________________________
+
+
+ protected org.jboss.soa.esb.message.Message receiveEsbMessage(long millis)
+ {
+ javax.jms.Message jmsMessage = null;
+ try { jmsMessage = _receiver.receive(millis); }
+ catch (JMSException oJ)
+ {
+ _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
+ for (int i1 = 0; i1 < 3; i1++)
+ // try to reconnect to the queue
+ try { checkMyParms(); }
+ catch (Exception e)
+ {
+ _logger.error("Reconnecting to Queue", e);
+ try { Thread.sleep(_iSleepForThreads); }
+ catch (InterruptedException e1)
+ { // Just return after logging
+ _logger.error("Unexpected thread interupt exception.", e);
+ return null;
+ }
+ }
+ }
+ if (null == jmsMessage)
+ return null;
+
+ if (!(jmsMessage instanceof ObjectMessage))
+ {
+ _logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+ return null;
+ }
+ try
+ {
+ return (org.jboss.soa.esb.message.Message)((ObjectMessage)jmsMessage).getObject();
+ }
+ catch (JMSException e1)
+ { _logger.error("Failed to read Serialized Object from JMS message.", e1);
+ return null;
+ }
+ catch (ClassCastException e2)
+ { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
+ }
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+
+ protected void cleanup()
+ {
+ if (null != _oQsess)
+ try { _oQsess.close(); }
+ catch (Exception e1) {/* Tried my best - Just continue */ }
+ if (null != _oQconn)
+ try { _oQconn.close(); }
+ catch (Exception e2) {/* Tried my best - Just continue */ }
+ }
+
+ private String obtainAttribute(String p_sAtt, String p_sDefault)
+ throws ConfigurationException
+ {
+ String sVal = _config.getAttribute(p_sAtt);
+ if ((null == sVal) && (null == p_sDefault))
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+
+ protected EsbListenerController _controller;
+ protected ConfigTree _config;
+ protected MessageConsumer _receiver;
+ protected boolean _bError = false;
+ protected QueueConnection _oQconn;
+ protected QueueSession _oQsess;
+ protected Queue _oQueue;
+ protected String _sSelector;
+ protected int _iSleepForThreads;
+
+ protected static transient Logger _logger = Logger.getLogger(JmsQueueListener.class);
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java 2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java 2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,10 @@
+package org.jboss.soa.esb.message.listeners;
+
+public class ListenerPropertyNames
+{
+ public static final String CONSUMER_TYPE = "consumer-type";
+ public static final String JNDI_TYPE = "jndi-type";
+ public static final String JNDI_URL = "jndi-URL";
+ public static final String MESSAGE_SELECTOR = "message-selector";
+ public static final String ACTION_DEFINITION_FACTORY = "action-definition-factory";
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java 2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java 2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.message.listeners;
+
+import java.text.*;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+
+/**
+ * Use this class to tune your XML configurations <p/>Once your config works
+ * with this dummy class, you can switch to your own action class <p/>You will
+ * have to implement these three methods in your own action class
+ *
+ * @author Esteban
+ *
+ */
+public class MockAction
+{
+
+ public MockAction(ConfigTree config) {}
+
+ public Message process(Message message)
+ {
+ Object oCurr = ActionUtils.getTaskObject(message);
+ if (null==oCurr)
+ oCurr = "null";
+ _logger.info(getStamp()+" process was called with <<" + oCurr.toString() + ">>");
+ return message;
+ } // ________________________________
+
+ private static final SimpleDateFormat s_oTS
+ = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+
+ private String getStamp()
+ { return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+ private static Logger _logger = Logger.getLogger(MockAction.class);
+
+} // ____________________________________________________________________________
More information about the jboss-svn-commits
mailing list