[jboss-svn-commits] JBL Code SVN: r6808 - in labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb: . actions message message/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Oct 14 18:07:26 EDT 2006


Author: estebanschifman
Date: 2006-10-14 18:07:21 -0400 (Sat, 14 Oct 2006)
New Revision: 6808

Added:
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java
Modified:
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
Log:
Initial cut of new 'Message aware' listeners and actions

Modified: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java	2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java	2006-10-14 22:07:21 UTC (rev 6808)
@@ -22,6 +22,7 @@
 
 package org.jboss.soa.esb.actions;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.message.Body;
 
 /**
@@ -31,8 +32,9 @@
  */
 public class ActionUtils 
 {
-	public static final String BEFORE_ACTION="org.jboss.soa.esb.actions.current.before";
-	public static final String CURRENT_OBJECT="org.jboss.soa.esb.actions.current.after";
+	public  static final String BEFORE_ACTION="org.jboss.soa.esb.actions.current.before";
+	private static final String CURRENT_OBJECT="org.jboss.soa.esb.actions.current.after";
+	private static final String CONFIG_TREE	="org.jboss.soa.esb.helpers.ConfigTree";
 
     /**
      * Obtain the current object from standard spot within message 
@@ -76,4 +78,31 @@
     		message.getBody().add(CURRENT_OBJECT,obj);
     	return oRet;
     }	
+
+    /**
+     * Put a Config tree in predefined spot for it in 'this'
+     * <br/>NULL values are not stored 
+     * @param message - to operate on
+     * @param tree - ConfigTree to store - Won't be stored if &lt;null&gt;
+     * @return ConfigTree previously stored
+     */
+    public static ConfigTree setConfigTree(Message message, ConfigTree tree)
+    {
+    	ConfigTree oRet = (ConfigTree)message.getBody().remove(CONFIG_TREE);
+    	if (null!=tree)
+    		message.getBody().add(CONFIG_TREE,tree);
+    	return oRet;
+    }	
+    /**
+     * Put a Config tree in predefined spot for it in 'this'
+     * <br/>NULL values are not stored 
+     * @param message - to operate on
+     * @param tree - ConfigTree to store - Won't be stored if &lt;null&gt;
+     * @return ConfigTree previously stored
+     */
+    public static ConfigTree getConfigTree(Message message)
+    {
+    	ConfigTree oRet = (ConfigTree)message.getBody().get(CONFIG_TREE);
+    	return (null==oRet)?new ConfigTree("mock"):oRet;
+    }	
 }

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java	2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ActionProcessingPipeline.java	2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,148 @@
+package org.jboss.soa.esb.message.listeners;
+
+import java.lang.reflect.*;
+
+import javax.enterprise.deploy.spi.exceptions.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.jboss.remoting.samples.chat.exceptions.InvalidArgumentException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+
+/**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs a list of action classes on a message
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+class ActionProcessingPipeline implements Runnable 
+{    
+         
+    /**
+     * public constructor
+     * @param message Message - The initial message to be run through the action class chain
+     */
+    public ActionProcessingPipeline(Message message)
+    	throws InvalidArgumentException, ConfigurationException
+    {
+    	if (null==_message)
+    		throw new IllegalArgumentException("Message must be not null");
+        _message	= message;
+        _config		= ActionUtils.getConfigTree(_message);
+    	if (null==_config)
+    		throw new IllegalArgumentException("Configuration needed for action classes");
+    	_actionList = _config.getChildren("action");
+    	if (null==_actionList || _actionList.length<1)
+    		throw new ConfigurationException("No ");
+    }
+    
+    /**
+     * 
+     * @return Message - current Message of this action chain processor
+     */
+    public Message getMessage()     { return _message; }
+
+    /* (non-Javadoc)
+     * @see java.lang.Runnable#run()
+     */
+    public void run()
+    {
+        try
+        {
+            // Run the message through each ActionProcessor...
+        	_currentIndex = -1;
+            for(ConfigTree oCurr : _actionList) 
+            {
+            	_currentIndex++;
+            	_currentAction = oCurr.getAttribute("class");
+
+            	Class oClass = Class.forName(_currentAction);
+            	Constructor oConst = oClass.getConstructor(new Class[] {ConfigTree.class});
+            	Object currentProcessor = oConst.newInstance(_config);            	
+            	Method method = oClass.getMethod("process",new Class[] {Message.class});
+            	
+            	// The processing result of each action feeds into the processing of the next action...
+                try 
+                {
+                	// copy currentObject in Message body to 'previous' currentObject
+                	ActionUtils.copyCurrentToPrevious(_message);
+                	Message next = (Message)method.invoke(currentProcessor,new Object[] {_message} );
+                    actionClassFinishedOk(next);
+                    if(next==null)
+                    {
+                    	_logger.error(prematureTermination("returned <null> - Cannot continue"));
+                    	return;
+                    }
+                }
+                catch (ClassCastException eCast)
+                {
+                	// If action class returns non Message, log and abort chain
+                	_logger.error(prematureTermination("returned a non Message Object)"));
+                	return;
+                }
+                catch (Exception e)
+                {
+                	// If action class threw exception, log and abort chain
+                    actionClassException(e);
+                    return;
+                }
+            }
+            actionChainFinishedOk();
+        } 
+        catch(ClassNotFoundException e)
+    	{	_logger.error(prematureTermination("action class is not in path"),e); } 
+        catch(NoSuchMethodException e)
+    	{	_logger.error(prematureTermination("method not found"),e); }
+        catch(InstantiationException e)
+    	{	_logger.error(prematureTermination("cannot instantiate action class"),e); } 
+        catch(InvocationTargetException e)
+    	{	_logger.error(prematureTermination("method not found"),e); } 
+        catch(IllegalAccessException e)
+    	{	_logger.error(prematureTermination("unable to access method"),e); } 
+        catch (IllegalArgumentException e) {}
+    }
+    
+    protected String prematureTermination(String s)
+    {
+    	return new StringBuilder("Premature termination of action processing pipeline ")
+		.append(getActionNames())
+		.append("].  ActionProcessor [").append(_currentAction)
+		.append("] ").append(s)
+		.toString()
+		;
+    }
+    
+    protected void actionClassException(Throwable thr)
+    {
+    	thr.printStackTrace();
+    }
+
+    protected void actionClassFinishedOk(Message message)
+    {
+    }
+
+    protected void actionChainFinishedOk()
+    {
+    }
+
+    protected String[] getActionNames()
+    {
+    	String[] sa = new String[_actionList.length];
+    	int i1=0;
+    	for (ConfigTree oCurr : _actionList)
+    		sa[i1++]=oCurr.getAttribute("name");
+    	return sa;
+    }
+    
+    protected ConfigTree[]	_actionList;
+    protected int			_currentIndex;
+    protected String		_currentAction;
+	protected Message		_message;
+	protected ConfigTree	_config;
+	protected Logger		_logger = Logger.getLogger(this.getClass());		
+}

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java	2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java	2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,660 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.message.listeners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.notification.NotificationList;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.NotificationHandlerFactory;
+import org.jboss.soa.esb.services.NotificationManager;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+/**
+ * Controlling class that will launch listener child threads for supported
+ * transport listener classes, as indicated in the configuration XML tree
+ * pointed by arg[0]
+ * 
+ * <p />
+ * Can be launched as uppermost controller (it has a main(args) method)
+ * <p />
+ * Also implements Runnable, and can thus be launched in a child thread from an
+ * upper controlling process
+ * <p />
+ * Listens on a JMS queue (with an optional message selector) for commands (e.g.
+ * Quiesce, Reload Parameters, Set End Time, etc.)
+ * <p />
+ * Parameter reloading can also be set using the PARM_RELOAD_SECS attribute
+ * <p />
+ * End time for this instance can also be set using the PARM_END_TIME attribute
+ * <p />
+ * 
+ * @author Esteban
+ * 
+ */
+public class EsbListenerController implements Runnable {
+
+	private static Logger m_oLogger = Logger.getLogger(EsbListenerController.class);
+
+	public static void main(String[] args) throws Exception {
+		EsbListenerController oProc = new EsbListenerController(args[0]);
+		oProc.run();
+		EsbListenerController.State oS = oProc.getState();
+
+		if (null != oS.getException()) {
+			m_oLogger.error("EsbListenerController <" + args[0] + "> FAILED\n", oS
+					.getException());
+		}
+		System.exit(oS.getCompletionCode());
+	} // ________________________________
+
+	protected int m_iDfltReloadMillis = 180000 // default interval between
+												// parameter reloads
+	;
+
+	public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
+	public static final String PARM_END_TIME = "endTime";
+
+	// Attribute name that denotes listener class to be instantiated in a child
+	// thread
+	// This attribute is not in the root node but in first level child
+	// DomElements
+	public static final String PARM_LISTENER_CLASS = "listenerClass";
+	public static final String PARM_ACTIONS = "actions";
+	public static final String PARM_MAX_THREADS = "maxThreads";
+	public static final String CHLD_EMAIL_PARMS = "EmailProperties";
+
+	private String m_sParmsName;
+	private DomElement m_oParms;
+
+	private HashMap<String, Object> m_oAtts;
+
+	/**
+	 * Obtain a shallow copy of needed atributes in this object's last loaded
+	 * parameter tree <p/>The local bject is cloned so child threads can use it
+	 * as they choose to without interfering with the environment
+	 * <p />
+	 * Listener processes controlled by this object should keep a reference to
+	 * this object at construction time, and not call this method again unless
+	 * they specifically need updated values. Parameter reload could have
+	 * happened since last call
+	 * 
+	 * @return Map - a shallow copy of the attributes Map
+	 */
+	@SuppressWarnings("unchecked")
+	public Map<String, Object> getControllerAttributes() {
+		return (Map<String, Object>) m_oAtts.clone();
+	}
+
+	private boolean m_bReloadRequested, m_bEndRequested;
+
+	private long m_lNextReload = Long.MAX_VALUE;
+
+	private long m_lEndTime = Long.MAX_VALUE;
+
+	public static final SimpleDateFormat s_oDateParse = new SimpleDateFormat(
+			"yyyyMMdd hh:mm:ss");
+
+	private State m_oState = null;
+
+	public State getState() {
+		return m_oState;
+	}
+
+	public static enum State {
+		Loading_parameters, Running, Shutting_down, Done_OK, Exception_thrown;
+		int m_iCompletionCode = 0;
+
+		Exception m_oException = null;
+
+		public int getCompletionCode() {
+			return m_iCompletionCode;
+		};
+
+		public Exception getException() {
+			return m_oException;
+		}
+	};
+
+	private CommandQueue commandQueue;
+
+    private ActionDefinitionFactory actionDefinitionFactory;
+
+	private static CommandQueue defaultCommandQueue = null;
+
+	/**
+	 * Package pivate default constructor. 
+	 */
+	protected EsbListenerController() {		
+	}
+	
+	/**
+	 * Construct a Listener Manager from the named repository based
+	 * configuration.
+	 * 
+	 * @param p_sParameterName
+	 *            Name of the Repository entry containing the configuration.
+	 * @throws Exception
+	 *             Unable to load/use the named configuration.
+	 */
+	public EsbListenerController(String p_sParameterName) throws Exception {
+		this(EsbListenerController.getListenerConfig(p_sParameterName));
+		m_sParmsName = p_sParameterName;
+	}
+
+	/**
+	 * Construct a Listener Manager using the specified listener configuration.
+	 * 
+	 * @param config
+	 *            The configuration.
+	 * @throws Exception
+	 *             Unable to load/use the supplied configuration.
+	 */
+	public EsbListenerController(DomElement config) throws Exception {
+		m_oParms = config;
+		m_oState = State.Loading_parameters;
+
+		try {
+			checkParms(m_oParms);
+			setEmailSystemProperties();
+		} catch (Exception e) {
+			String configSource = config.getAttr("configSource");
+
+			m_oState = State.Exception_thrown;
+			m_oState.m_oException = e;
+			m_oLogger.fatal("Listener configuration and startup error.  Config Source: "
+									+ (configSource != null ? configSource
+											: "unknown"), e);
+
+			throw e;
+		}
+	}
+
+	/**
+	 * Load the named listener configuration from the configured parameter
+	 * repository.
+	 * 
+	 * @param reposParam
+	 *            The name of the repository entry containing the Listener
+	 *            configuration.
+	 * @return Listener Configuration as {@link DomElement}.
+	 * @throws IOException
+	 *             Unable to access the repository.
+	 * @throws ParamRepositoryException
+	 *             Unable to access the configuration in the repository.
+	 * @throws SAXException
+	 *             Unable to parse the configuration.
+	 */
+	private static DomElement getListenerConfig(String reposParam)
+			throws IOException, ParamRepositoryException, SAXException {
+		String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+		DomElement config = DomElement.fromXml(sXml);
+
+		config.setAttr("configSource", "param-repository:" + reposParam);
+
+		return config;
+	}
+
+	/**
+	 * Check to see if all needed parameters are there, and assign default
+	 * values to some of them
+	 * 
+	 * @param p_oP
+	 *            DomElement - Where to look for the mandatory/optional
+	 *            configuration attributes
+	 * @throws Exception -
+	 *             If attributes are wrong or not enough for a proper runtime
+	 *             configuration
+	 */
+	public void checkParms(DomElement p_oP) throws Exception {
+		// We've just loaded - set to false until next reload requested
+		m_bReloadRequested = false;
+		commandQueue = createCommandQueue(p_oP);
+
+		// Open the command queue...
+		if (null!=commandQueue)
+			commandQueue.open(p_oP);
+
+		// if PARM_RELOAD_SECS not set, and no command queue
+		// then reload every 10 minutes
+		// If there is a command queue, run until command is received
+		String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
+		m_lNextReload = (null != sRldSecs) 
+				? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
+				: (null == commandQueue) 
+						? Long.MAX_VALUE 
+						: System.currentTimeMillis() + m_iDfltReloadMillis;
+
+		// if PARM_END_TIME not set try to run forever
+		// not a good practice if command queue is not set
+		// Expected date format is "yyyyMMdd hh:mm:ss"
+		String sEndT = p_oP.getAttr(PARM_END_TIME);
+		m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
+				sEndT).getTime();
+
+        // Read and initialise the action definitions...
+        DomElement actionConfig = p_oP.getFirstElementChild("Actions");
+        if(actionConfig == null) {
+            throw new ConfigurationException("No 'Actions' configuration.");
+        }        
+        actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
+        
+	} // ________________________________
+
+    /**
+     * Factory method for creating the command queue.
+     * @param config EsbListenerController config.
+     * @return EsbListenerController CommandQueue instance.
+     */
+	private CommandQueue createCommandQueue(DomElement config) {
+		String commandQueueClass = config.getAttr("command-queue-class");
+		
+		if(commandQueueClass != null) {
+			try {
+				return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+			} catch (Exception e) {
+				m_oLogger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "].  Defaulting to no Command Queue", e);
+			}
+		}
+			
+		return defaultCommandQueue;
+	}
+
+	/**
+	 * Allows a default command queue to be set statically for all EsbListenerController instances.
+	 * @param defaultCommandQueue The defaultCommandQueue to set.
+	 */
+	public static void setDefaultCommandQueue(CommandQueue defaultCommandQueue) {
+		EsbListenerController.defaultCommandQueue = defaultCommandQueue;
+	}
+
+	/**
+	 * Main execution loop <p/> Will continue to run until either <p/>a) run
+	 * time is expired <p/>b) quiesce command is received in command queue
+	 * <p/>For every child element that contains a PARM_LISTENER_CLASS
+	 * attribute, this method will try to launch a child thread instantiating an
+	 * object of that class, and will call it's run() method <p/>Once all child
+	 * processes are trigered, the main thread will either <p/>1) wait for a
+	 * message in the command queue (if one was configured) until next reload or
+	 * end of run period expired <p/>or 2) Just sleep if there's no command
+	 * queue to listen on
+	 */
+	public void run() {
+		while (endNotRequested()) {
+			m_oState = State.Running;
+			for (DomElement oCurr : m_oParms.getAllElemChildren()) {
+				String sClass = oCurr.getAttr(PARM_LISTENER_CLASS);
+				if (Util.isNullString(sClass))
+					continue;
+				tryToLaunchChildListener(oCurr, sClass);
+			}
+
+			waitForCmdOrSleep();
+
+			if (endRequested()) {
+				break;
+			}
+			if (m_sParmsName != null && timeToReload()) {
+				try {
+					m_oState = State.Loading_parameters;
+					m_oLogger
+							.info("Reloading parameters _____________________________________________________");
+					DomElement oNew = EsbListenerController.getListenerConfig(m_sParmsName);
+					checkParms(oNew);
+					m_oParms = oNew;
+					setEmailSystemProperties();
+				} catch (Exception e) {
+					m_oLogger.error("Failed to reload parameters"
+							+ " - Continuing with cached version", e);
+				}
+			}
+		}
+		// m_oState = State.Shutting_down;
+
+		m_oState = State.Done_OK;
+		m_oState.m_iCompletionCode = 0;
+		m_oLogger
+				.info("Finishing_____________________________________________________");
+
+		// Close the command queue...
+		try {
+			commandQueue.close();
+		} catch (CommandQueueException e) {
+			m_oLogger.error("Error closing Command Queue.", e);
+		}
+	} // ________________________________
+
+	private void tryToLaunchChildListener(DomElement p_oP, String p_sClassName) {
+		try {
+			Class oListener = Class.forName(p_sClassName);
+			Constructor oConst = oListener.getConstructor(new Class[] {
+					this.getClass(), DomElement.class, ActionDefinitionFactory.class });
+			Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
+					p_oP, actionDefinitionFactory });
+			new Thread(oRun).start();
+		} catch (Exception e) {
+			m_oLogger.error("Cannot launch <" + p_sClassName + ">\n", e);
+		}
+	} // ________________________________
+
+	long millisToWait() {
+		return Math.min(m_lNextReload, m_lEndTime) - System.currentTimeMillis();
+	} // ________________________________
+
+	private void waitForCmdOrSleep() {
+		long lToGo = millisToWait();
+
+		if (null == commandQueue) {
+			m_oLogger.debug("About to sleep " + lToGo);
+			// No command queue nor topic - Just sleep until time
+			// exhausted, or thread interrupted
+			try {
+				if (lToGo > 0)
+					Thread.sleep(lToGo);
+			} catch (InterruptedException e) {
+				m_lEndTime = 0; // mark as end requested and return
+			}
+			return;
+		}
+
+		// Wait for commands until time exhausted or command received
+		// Note that received commands might change time variables (reload/end)
+		// that's why time to go is recalculated on each cycle
+		while ((lToGo = millisToWait()) > 0) {
+			try {
+				m_oLogger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+				String oM = commandQueue.receiveCommand(lToGo);
+				if (null == oM) {
+					return;
+				}
+				processCommand(oM);
+				if (endRequested() || timeToReload()) {
+					break;
+				}
+			} catch (CommandQueueException eJ) {
+				m_oLogger.info("receive on command queue failed", eJ);
+			}
+		}
+	} // ________________________________
+
+	/**
+	 * Processes the command that has been received in the command queue (or
+	 * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
+	 * changed
+	 * 
+	 * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+	 * width="400"/> </COLGROUP>
+	 * <TR>
+	 * <TD align="center">message text</TD>
+	 * <TD align="center">effect</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>shutdown*</TD>
+	 * <TD>End time will be immediately set to 'now' - quiesce process will
+	 * start - Child threads will be allowed to finish normally</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>reload param*</TD>
+	 * <TD>Parameters will be immediately reloaded, and listener reconfigured
+	 * with new values</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+	 * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+	 * end of day assumed (23:59:59)</TD>
+	 * </TR>
+	 * </TABLE> * startsWith() <p/>
+	 * 
+	 * @param p_oMsg
+	 *            Message received from the command queue.
+	 * 
+	 */
+	private void processCommand(String sTxt) {
+		if (null == sTxt)
+			return;
+		
+		String sLow = sTxt.trim().toLowerCase();
+		if (sLow.startsWith("shutdown")) {
+			m_bEndRequested = true;
+			m_oLogger.info("Shutdown has been requested");
+			return;
+		}
+		if (sLow.startsWith("reload param")) {
+			m_bReloadRequested = true;
+			m_oLogger
+					.info("Request for parameter reload has been received");
+			return;
+		}
+		String[] sa = sLow.split("\\s+");
+		if (sa.length > 1 && "endtime".equals(sa[0])) {
+			try {
+				String sDate = sa[1];
+				String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+						: sa[2];
+				Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+				m_oLogger.info("New end date set to : " + oEnd);
+				m_lEndTime = oEnd.getTime();
+			} catch (Exception eDat) {
+				m_oLogger.info("Problems with endTime command", eDat);
+			}
+		}
+	} // ________________________________
+
+	/**
+	 * Accessor to determine if execution time is expired or shutdown requested
+	 * 
+	 * @return boolean if processing has to stop (all child threads will be
+	 *         allowed to finish)
+	 */
+	public boolean endRequested() {
+		return m_bEndRequested || System.currentTimeMillis() >= m_lEndTime;
+	}
+
+	/**
+	 * Accessor to determine if execution time is not expired, and no shutdown
+	 * request received
+	 * 
+	 * @return boolean - true if run time has not expired and quiesce has not
+	 *         been requested
+	 */
+	public boolean endNotRequested() {
+		return !endRequested();
+	}
+
+	/**
+	 * Provide a common accessor to determine if parameters have to be reloaded
+	 * <p/> For child threads this means thread execution has to end
+	 * </p>
+	 * Child processes should only call this method when they are idle (as
+	 * opposed to in the middle of executing a unit of work)
+	 * 
+	 * @return boolean - true if it's time to reload parameters
+	 */
+	public boolean timeToReload() {
+		return m_bReloadRequested
+				|| System.currentTimeMillis() >= m_lNextReload;
+	}
+
+	/**
+	 * Helper accessor for child processes that provides info to determine if
+	 * they can continue with yet another execution cycle
+	 * 
+	 * @return boolean - true if runtime is not expired and not time yet to
+	 *         reload parameters
+	 */
+	public boolean continueLooping() {
+		return (endNotRequested() && !timeToReload());
+	} // ________________________________
+
+	private static final String[] s_saMailProps = { Environment.SMTP_HOST,
+		Environment.SMTP_USERNAME, Environment.SMTP_PASSWORD,
+		Environment.SMTP_PORT, Environment.SMTP_FROM,
+		Environment.SMTP_AUTH };
+
+	private void setEmailSystemProperties() {
+		DomElement oEmail = m_oParms.getFirstElementChild(CHLD_EMAIL_PARMS);
+		if (null != oEmail)
+			for (String sCurr : s_saMailProps) {
+				String sProp = oEmail.getAttr(sCurr);
+				if (null != sProp)
+					ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).setProperty(sCurr, sProp);
+			}
+	} // ________________________________
+
+	/**
+	 * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+	 * 
+	 * @param p_oP
+	 *            DomElement - look for attributes in this Element only
+	 * @param p_sAtt
+	 *            String - Name of attribute to find
+	 * @param p_sDefault
+	 *            String -default value if requested attribute is not there
+	 * @return String - value of attribute, or default value (if null)
+	 * @throws Exception -
+	 *             If requested attribute not found and no default value
+	 *             supplied by invoker
+	 */
+	public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+			throws ConfigurationException {
+		String sVal = p_oP.getAttr(p_sAtt);
+		if ((null == sVal) && (null == p_sDefault))
+			throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+		return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+
+	/**
+	 * Find child nodes named "NotificationList" that contain an attribute
+	 * 'type' that starts with "ok" (case insensitive)
+	 * 
+	 * @param p_oP -
+	 *            DomElement to search for "NotificationList" child Elements
+	 * @param p_oSer
+	 *            Serializable - Will constitute the body of the notification
+	 */
+	public static void notifyOK(DomElement p_oP, Serializable p_oSer) {
+        if(p_oSer ==  null) {
+            return;
+        }
+        
+		try {
+			Serializable oNotif = p_oSer;
+			for (DomElement oCurr : p_oP
+					.getElementChildren(NotificationList.ELEMENT)) {
+				NotificationList oNL = new NotificationList(oCurr);
+				if (!oNL.isOK())
+					continue;
+				getNotifHandler().sendNotifications(oCurr, oNotif);
+			}
+		} catch (Exception e) {
+		}
+	} // __________________________________
+
+	/**
+	 * Find child nodes named "NotificationList" that contain an attribute
+	 * 'type' that starts with "err" (case insensitive) or no 'type' attribute
+	 * set
+	 * 
+	 * @param p_oP -
+	 *            DomElement to search for "NotificationList" child Elements
+	 * @param p_e -
+	 *            Exception if not null, will be appended to the body
+	 * @param p_oSer
+	 *            Serializable - Will be included at the beginning of the body
+	 *            of the notification
+	 */
+	public static void notifyError(DomElement p_oP, Exception p_e, Serializable p_oSer) {
+        if(p_oSer ==  null) {
+            return;
+        }
+        
+		Serializable oNotif = p_oSer;
+		ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+		PrintStream oPS = new PrintStream(oBO);
+		try {
+			oPS.println(oNotif.toString());
+			if (null != p_e)
+				p_e.printStackTrace(oPS);
+			oPS.close();
+
+			String sMsg = oBO.toString();
+			for (DomElement oCurr : p_oP
+					.getElementChildren(NotificationList.ELEMENT)) {
+				NotificationList oNL = new NotificationList(oCurr);
+				if (!oNL.isErr())
+					continue;
+				getNotifHandler().sendNotifications(oNL, sMsg);
+			}
+		} catch (Exception e) {
+		}
+	} // ________________________________
+
+	private static NotificationManager s_oNH;
+
+	private static final Object s_oSync = new Integer(0);
+
+	/**
+	 * Lazy instantiator of a InotificationHandler
+	 * 
+	 * @return - a reference to an implementation of the interface or null if it
+	 *         can't be instantiated
+	 */
+	protected static NotificationManager getNotifHandler() {
+		if (null != s_oNH)
+			return s_oNH;
+		synchronized (s_oSync) {
+			if (null == s_oNH)
+				try {
+					s_oNH = NotificationHandlerFactory.getNotifHandler(
+							"remote", Configuration.getJndiServerType(),
+							Configuration.getJndiServerURL());
+				} catch (Exception e) {
+					Logger.getLogger(EsbListenerController.class).error(
+							"Notification FAILED", e);
+				}
+		}
+		return s_oNH;
+	} // ______________________________
+
+} // ____________________________________________________________________________

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java	2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java	2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.message.listeners;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.addressing.helpers.JMSEpr;
+
+public class JmsQueueListener 
+{
+    public JmsQueueListener(EsbListenerController controller, ConfigTree config) 
+    	throws Exception 
+    {
+    	_controller			= controller;
+    	_config				= config;
+    	_iSleepForThreads	= 3;
+    	checkMyParms();
+    } // __________________________________
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
+     */
+    protected void checkMyParms() throws Exception 
+    {
+        // Third arg is null - Exception will br thrown if listenQueue is not found
+        String sQueue = obtainAttribute	(JMSEpr.DESTINATION_NAME_TAG, null);
+
+        // No problem if selector is null - everything in queue will be returned
+        _sSelector = _config.getAttribute(ListenerPropertyNames.MESSAGE_SELECTOR);
+
+        _oQconn = null;
+        _oQsess = null;
+        _oQueue = null;
+
+        String sJndiType = obtainAttribute	(ListenerPropertyNames.JNDI_TYPE	,"jboss");
+        String sJndiURL = obtainAttribute	(ListenerPropertyNames.JNDI_URL		,"localhost");
+        Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+        String sFactClass = obtainAttribute(JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+        Object tmp = oJndiCtx.lookup(sFactClass);
+        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+        _oQconn = qcf.createQueueConnection();
+        _oQueue = (Queue) oJndiCtx.lookup(sQueue);
+        _oQsess = _oQconn.createQueueSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        _oQconn.start();
+        _receiver = _oQsess.createReceiver(_oQueue, _sSelector);
+
+    } // ________________________________
+
+
+    protected org.jboss.soa.esb.message.Message receiveEsbMessage(long millis)
+    {
+    	javax.jms.Message jmsMessage = null;
+        try {	jmsMessage = _receiver.receive(millis); }
+        catch (JMSException oJ)
+        {
+        	_logger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
+        	for (int i1 = 0; i1 < 3; i1++)
+        		// try to reconnect to the queue
+        		try { checkMyParms(); } 
+        		catch (Exception e)
+        		{
+        			_logger.error("Reconnecting to Queue", e);
+        			try {	Thread.sleep(_iSleepForThreads); }
+        			catch (InterruptedException e1)
+        			{ // Just return after logging
+        				_logger.error("Unexpected thread interupt exception.", e);
+        				return null;
+                    }
+                 }
+        }
+        if (null == jmsMessage)
+        	return null;
+
+        if (!(jmsMessage instanceof ObjectMessage))
+        {
+        	_logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+        	return null;
+        }
+        try
+        {
+        	return (org.jboss.soa.esb.message.Message)((ObjectMessage)jmsMessage).getObject();
+        } 
+        catch (JMSException e1)
+        { _logger.error("Failed to read Serialized Object from JMS message.", e1);
+          return null;
+        }
+        catch (ClassCastException e2)
+        { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
+        }
+        return null;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+     */
+
+    protected void cleanup() 
+    {
+        if (null != _oQsess)
+            try { _oQsess.close(); }
+            catch (Exception e1) {/* Tried my best - Just continue */ }
+        if (null != _oQconn)
+            try { _oQconn.close(); }
+        	catch (Exception e2) {/* Tried my best - Just continue */ }
+    }
+
+    private String obtainAttribute(String p_sAtt, String p_sDefault)
+		throws ConfigurationException 
+	{
+    	String sVal = _config.getAttribute(p_sAtt);
+    	if ((null == sVal) && (null == p_sDefault))
+    		throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+	
+    	return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+
+	protected EsbListenerController _controller;
+    protected ConfigTree		_config;
+    protected MessageConsumer 	_receiver;
+    protected boolean 			_bError = false;
+    protected QueueConnection	_oQconn;
+    protected QueueSession		_oQsess;
+    protected Queue 			_oQueue;
+    protected String 			_sSelector;
+    protected int				_iSleepForThreads;
+
+    protected static transient Logger _logger = Logger.getLogger(JmsQueueListener.class);
+} 

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java	2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/ListenerPropertyNames.java	2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,10 @@
+package org.jboss.soa.esb.message.listeners;
+
+public class ListenerPropertyNames 
+{
+	public static final String CONSUMER_TYPE	= "consumer-type";
+	public static final String JNDI_TYPE		= "jndi-type";
+    public static final String JNDI_URL 		= "jndi-URL";
+    public static final String MESSAGE_SELECTOR	= "message-selector";
+    public static final String ACTION_DEFINITION_FACTORY = "action-definition-factory";
+}

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java	2006-10-14 20:43:26 UTC (rev 6807)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/MockAction.java	2006-10-14 22:07:21 UTC (rev 6808)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.message.listeners;
+
+import java.text.*;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+
+/**
+ * Use this class to tune your XML configurations <p/>Once your config works
+ * with this dummy class, you can switch to your own action class <p/>You will
+ * have to implement these three methods in your own action class
+ * 
+ * @author Esteban
+ * 
+ */
+public class MockAction
+{
+
+    public MockAction(ConfigTree config) {}
+    
+    public Message process(Message message) 
+    {
+    	Object oCurr = ActionUtils.getTaskObject(message);
+    	if (null==oCurr)
+    		oCurr = "null";
+        _logger.info(getStamp()+" process was called with <<" + oCurr.toString() + ">>");
+        return message;
+    } // ________________________________
+
+    private static final SimpleDateFormat s_oTS 
+    	= new SimpleDateFormat("yyyy/MM/dd hh:mm:ss.SSS");
+
+    private String getStamp()
+    	{ return s_oTS.format(new java.util.Date(System.currentTimeMillis())); }
+
+    private static Logger _logger = Logger.getLogger(MockAction.class);
+
+} // ____________________________________________________________________________




More information about the jboss-svn-commits mailing list