[jboss-svn-commits] JBL Code SVN: r6910 - in labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb: . dispatchers listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Oct 19 09:42:46 EDT 2006


Author: estebanschifman
Date: 2006-10-19 09:42:40 -0400 (Thu, 19 Oct 2006)
New Revision: 6910

Added:
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java
   labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java
Log:
Classes needed for dispatching

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java	2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java	2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.message.Message;
+
+public abstract class Dispatcher 
+{
+
+	public abstract boolean dispatch(Message message) throws DispatcherException;
+
+	// no default constructor
+	private Dispatcher() {}
+
+	// protected constructor
+	protected Dispatcher (EPR epr)
+	{
+		_epr		= epr;
+	}
+	
+	
+	EPR				_epr;
+}

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java	2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java	2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Dispatch Exception.
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+public class DispatcherException extends BaseException 
+{
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+     * Construct an exception instance. 
+     * @param message Exception message.
+     */
+    public DispatcherException(String message) { super(message); }
+
+    /**
+     * Construct an exception instance. 
+     * @param message Exception message.
+     * @param cause Exception cause.
+     */
+    public DispatcherException(String message, Throwable cause) { super(message, cause); }
+
+    /**
+     * Construct an exception instance. 
+     * @param cause Exception cause.
+     */
+    public DispatcherException(Throwable cause) { super(cause); }
+}

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java	2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java	2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import java.net.URISyntaxException;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.helpers.*;
+
+public class DispatcherFactory 
+{
+	// private default constructor
+	private DispatcherFactory() {}
+
+	public static DispatcherFactory getInstance(Object obj)
+	{
+		return _instance;
+	}
+	
+	public static Dispatcher getDispatcher(EPR epr) throws DispatcherException
+	{
+		try
+		{
+			String address = epr.getAddr().getAddress();
+			if (address.startsWith(JMSEpr.JMS_PROTOCOL))
+				return new JmsDispatcher(epr);
+		}
+		catch (URISyntaxException e) { throw new DispatcherException(e); }
+		throw new DispatcherException("Unknown protocol");
+	}
+	
+	private static final DispatcherFactory _instance = new DispatcherFactory();
+}

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java	2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java	2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.message.Message;
+
+public class JmsDispatcher extends Dispatcher 
+{
+	JmsDispatcher(EPR epr)
+	{
+		super(epr);
+	}
+
+	@Override
+	public boolean dispatch(Message message) throws DispatcherException 
+	{
+		// TODO Auto-generated method stub
+		return false;
+	}
+}

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java	2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java	2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,550 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.util.EPRManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+public class GatewayListenerController implements Runnable
+{
+
+	public static void main(String[] args) throws Exception 
+	{
+		GatewayListenerController oProc = new GatewayListenerController(args[0]);
+		oProc.run();
+		GatewayListenerController.State oS = oProc.getState();
+
+		if (null != oS.getException()) {
+			_logger.error("GatewayListener <" + args[0] + "> FAILED\n", oS
+					.getException());
+		}
+		System.exit(oS.getCompletionCode());
+	} // ________________________________
+
+	public static final String RELOAD_SECONDS_TAG 	= "parameterReloadSecs";
+	public static final String END_TIME_TAG 		= "endTime";
+
+	// Attribute name that denotes listener class to be instantiated in a child thread
+	// This attribute is not in the root node but in first level child ConfigTrees
+	public static final String GATEWAY_CLASS_TAG 	= "gatewayClass";
+
+	/**
+	 * Obtain a shallow copy of needed atributes in this object's last loaded
+	 * parameter tree <p/>The local bject is cloned so child threads can use it
+	 * as they choose to without interfering with the environment
+	 * <p />
+	 * Listener processes controlled by this object should keep a reference to
+	 * this object at construction time, and not call this method again unless
+	 * they specifically need updated values. Parameter reload could have
+	 * happened since last call
+	 * 
+	 * @return Map - a shallow copy of the attributes Map
+	 */
+	@SuppressWarnings("unchecked")
+	public Map<String, Object> getControllerAttributes() 
+	{
+		return (Map<String, Object>) _attributes.clone();
+	}
+
+	public State getState() {	return _status; }
+
+	public static enum State 
+	{
+		Loading_parameters, Running, Shutting_down, Done_OK, Exception_thrown;
+		int m_iCompletionCode = 0;
+
+		Exception m_oException = null;
+
+		public int getCompletionCode() {
+			return m_iCompletionCode;
+		};
+
+		public Exception getException() {
+			return m_oException;
+		}
+	};
+
+    private ActionDefinitionFactory actionDefinitionFactory;
+
+	/**
+	 * Package pivate default constructor. 
+	 */
+	protected GatewayListenerController() { }
+	
+	/**
+	 * Construct a Listener Manager from the named repository based
+	 * configuration.
+	 * 
+	 * @param p_sParameterName
+	 *            Name of the Repository entry containing the configuration.
+	 * @throws Exception
+	 *             Unable to load/use the named configuration.
+	 */
+	public GatewayListenerController(String p_sParameterName) throws Exception {
+		this(GatewayListenerController.getListenerConfig(p_sParameterName));
+		_sParametersName = p_sParameterName;
+	}
+
+	/**
+	 * Construct a Listener Manager using the specified listener configuration.
+	 * 
+	 * @param config
+	 *            The configuration.
+	 * @throws Exception
+	 *             Unable to load/use the supplied configuration.
+	 */
+	public GatewayListenerController(ConfigTree config) throws Exception {
+		_config = config;
+		_status = State.Loading_parameters;
+
+		try {	checkParms(_config); }
+		catch (Exception e) 
+		{
+			String configSource = config.getAttribute("configSource");
+			_status = State.Exception_thrown;
+			_status.m_oException = e;
+			_logger.fatal("Listener configuration and startup error.  Config Source: "
+									+ (configSource != null ? configSource
+											: "unknown"), e);
+			throw e;
+		}
+	}
+
+	/**
+	 * Load the named listener configuration from the configured parameter
+	 * repository.
+	 * 
+	 * @param reposParam
+	 *            The name of the repository entry containing the Listener
+	 *            configuration.
+	 * @return Listener Configuration as {@link ConfigTree}.
+	 * @throws IOException
+	 *             Unable to access the repository.
+	 * @throws ParamRepositoryException
+	 *             Unable to access the configuration in the repository.
+	 * @throws SAXException
+	 *             Unable to parse the configuration.
+	 */
+	private static ConfigTree getListenerConfig(String reposParam)
+			throws IOException, ParamRepositoryException, SAXException {
+		String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+		ConfigTree config = ConfigTree.fromXml(sXml);
+
+		config.setAttribute("configSource", "param-repository:" + reposParam);
+
+		return config;
+	}
+
+	/**
+	 * Check to see if all needed parameters are there, and assign default
+	 * values to some of them
+	 * 
+	 * @param p_oP
+	 *            ConfigTree - Where to look for the mandatory/optional
+	 *            configuration attributes
+	 * @throws Exception -
+	 *             If attributes are wrong or not enough for a proper runtime
+	 *             configuration
+	 */
+	public void checkParms(ConfigTree p_oP) throws Exception {
+		// We've just loaded - set to false until next reload requested
+		_reloadRequested = false;
+		_commandQueue = createCommandQueue(p_oP);
+
+		// Open the command queue...
+		if (null!=_commandQueue)
+			_commandQueue.open(DomElement.fromConfigTree(p_oP));
+
+		// if RELOAD_SECONDS_TAG not set, and no command queue
+		// then reload every 10 minutes
+		// If there is a command queue, run until command is received
+		String sRldSecs = p_oP.getAttribute(RELOAD_SECONDS_TAG);
+		_nextReload = (null != sRldSecs) 
+				? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
+				: (null == _commandQueue) 
+						? Long.MAX_VALUE 
+						: System.currentTimeMillis() + _defaultReloadMillis;
+
+		// if END_TIME_TAG not set try to run forever
+		// not a good practice if command queue is not set
+		// Expected date format is "yyyyMMdd hh:mm:ss"
+		String sEndT = p_oP.getAttribute(END_TIME_TAG);
+		_endTime = (null == sEndT) ? Long.MAX_VALUE : _dateFormat.parse(
+				sEndT).getTime();
+
+        // Read and initialise the action definitions...
+        ConfigTree actionConfig = p_oP.getFirstChild("Actions");
+        if(actionConfig == null) {
+            throw new ConfigurationException("No 'Actions' configuration.");
+        }        
+        actionDefinitionFactory = new ActionDefinitionFactory(DomElement.fromConfigTree(actionConfig));
+        
+	} // ________________________________
+
+    /**
+     * Factory method for creating the command queue.
+     * @param config GatewayListener config.
+     * @return GatewayListener CommandQueue instance.
+     */
+	private CommandQueue createCommandQueue(ConfigTree config) {
+		String commandQueueClass = config.getAttribute("command-queue-class");
+		
+		if(commandQueueClass != null) {
+			try {
+				return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+			} catch (Exception e) {
+				_logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "].  Defaulting to no Command Queue", e);
+			}
+		}
+			
+		return _defaultCommandQueue;
+	}
+
+	/**
+	 * Allows a default command queue to be set statically for all GatewayListener instances.
+	 * @param defaultCommandQueue The defaultCommandQueue to set.
+	 */
+	public static void setDefaultCommandQueue(CommandQueue defaultCommandQueue) {
+		GatewayListenerController._defaultCommandQueue = defaultCommandQueue;
+	}
+
+	/**
+	 * Main execution loop <p/> Will continue to run until either <p/>a) run
+	 * time is expired <p/>b) quiesce command is received in command queue
+	 * <p/>For every child element that contains a PARM_LISTENER_CLASS
+	 * attribute, this method will try to launch a child thread instantiating an
+	 * object of that class, and will call it's run() method <p/>Once all child
+	 * processes are trigered, the main thread will either <p/>1) wait for a
+	 * message in the command queue (if one was configured) until next reload or
+	 * end of run period expired <p/>or 2) Just sleep if there's no command
+	 * queue to listen on
+	 */
+	public void run() 
+	{
+		while (endNotRequested()) 
+		{
+			_status = State.Running;
+			for (ConfigTree oCurr : _config.getAllChildren()) {
+				String sClass = oCurr.getAttribute(GATEWAY_CLASS_TAG);
+				if (Util.isNullString(sClass))
+					continue;
+				tryToLaunchGateway(oCurr, sClass);
+			}
+
+			waitForCmdOrSleep();
+
+			if (endRequested()) {
+				break;
+			}
+			if (_sParametersName != null && timeToReload()) 
+			{
+				try 
+				{
+					_status = State.Loading_parameters;
+					_logger
+							.info("Reloading parameters _____________________________________________________");
+					ConfigTree oNew = GatewayListenerController.getListenerConfig(_sParametersName);
+					checkParms(oNew);
+					_config = oNew;
+				} catch (Exception e) {
+					_logger.error("Failed to reload parameters"
+							+ " - Continuing with cached version", e);
+				}
+			}
+		}
+		// _status = State.Shutting_down;
+
+		_status = State.Done_OK;
+		_status.m_iCompletionCode = 0;
+		_logger
+				.info("Finishing_____________________________________________________");
+
+		// Close the command queue...
+		try {
+			_commandQueue.close();
+		} catch (CommandQueueException e) {
+			_logger.error("Error closing Command Queue.", e);
+		}
+	} // ________________________________
+
+	private void tryToLaunchGateway(ConfigTree p_oP, String p_sClassName) 
+	{
+		try {
+			Class oListener = Class.forName(p_sClassName);
+			Constructor oConst = oListener.getConstructor(new Class[] {
+					this.getClass(), ConfigTree.class, ActionDefinitionFactory.class });
+			Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
+					p_oP, actionDefinitionFactory });
+			new Thread(oRun).start();
+		} catch (Exception e) {
+			_logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+		}
+	} // ________________________________
+
+	long millisToWait() {
+		return Math.min(_nextReload, _endTime) - System.currentTimeMillis();
+	} // ________________________________
+
+	private void waitForCmdOrSleep() {
+		long lToGo = millisToWait();
+
+		if (null == _commandQueue) {
+			_logger.debug("About to sleep " + lToGo);
+			// No command queue nor topic - Just sleep until time
+			// exhausted, or thread interrupted
+			try {
+				if (lToGo > 0)
+					Thread.sleep(lToGo);
+			} catch (InterruptedException e) {
+				_endTime = 0; // mark as end requested and return
+			}
+			return;
+		}
+
+		// Wait for commands until time exhausted or command received
+		// Note that received commands might change time variables (reload/end)
+		// that's why time to go is recalculated on each cycle
+		while ((lToGo = millisToWait()) > 0) {
+			try {
+				_logger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+				String oM = _commandQueue.receiveCommand(lToGo);
+				if (null == oM) {
+					return;
+				}
+				processCommand(oM);
+				if (endRequested() || timeToReload()) {
+					break;
+				}
+			} catch (CommandQueueException eJ) {
+				_logger.info("receive on command queue failed", eJ);
+			}
+		}
+	} // ________________________________
+
+	/**
+	 * Processes the command that has been received in the command queue (or
+	 * topic) <p/>_endRequested, _reloadRequested, and _endTime could be
+	 * changed
+	 * 
+	 * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+	 * width="400"/> </COLGROUP>
+	 * <TR>
+	 * <TD align="center">message text</TD>
+	 * <TD align="center">effect</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>shutdown*</TD>
+	 * <TD>End time will be immediately set to 'now' - quiesce process will
+	 * start - Child threads will be allowed to finish normally</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>reload param*</TD>
+	 * <TD>Parameters will be immediately reloaded, and listener reconfigured
+	 * with new values</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+	 * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+	 * end of day assumed (23:59:59)</TD>
+	 * </TR>
+	 * </TABLE> * startsWith() <p/>
+	 * 
+	 * @param p_oMsg
+	 *            Message received from the command queue.
+	 * 
+	 */
+	private void processCommand(String sTxt) {
+		if (null == sTxt)
+			return;
+		
+		String sLow = sTxt.trim().toLowerCase();
+		if (sLow.startsWith("shutdown")) {
+			_endRequested = true;
+			_logger.info("Shutdown has been requested");
+			return;
+		}
+		if (sLow.startsWith("reload param")) {
+			_reloadRequested = true;
+			_logger
+					.info("Request for parameter reload has been received");
+			return;
+		}
+		String[] sa = sLow.split("\\s+");
+		if (sa.length > 1 && "endtime".equals(sa[0])) {
+			try {
+				String sDate = sa[1];
+				String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+						: sa[2];
+				Date oEnd = _dateFormat.parse(sDate + " " + sTime);
+				_logger.info("New end date set to : " + oEnd);
+				_endTime = oEnd.getTime();
+			} catch (Exception eDat) {
+				_logger.info("Problems with endTime command", eDat);
+			}
+		}
+	} // ________________________________
+
+	/**
+	 * Accessor to determine if execution time is expired or shutdown requested
+	 * 
+	 * @return boolean if processing has to stop (all child threads will be
+	 *         allowed to finish)
+	 */
+	public boolean endRequested() {
+		return _endRequested || System.currentTimeMillis() >= _endTime;
+	}
+
+	/**
+	 * Accessor to determine if execution time is not expired, and no shutdown
+	 * request received
+	 * 
+	 * @return boolean - true if run time has not expired and quiesce has not
+	 *         been requested
+	 */
+	public boolean endNotRequested() {
+		return !endRequested();
+	}
+
+	/**
+	 * Provide a common accessor to determine if parameters have to be reloaded
+	 * <p/> For child threads this means thread execution has to end
+	 * </p>
+	 * Child processes should only call this method when they are idle (as
+	 * opposed to in the middle of executing a unit of work)
+	 * 
+	 * @return boolean - true if it's time to reload parameters
+	 */
+	public boolean timeToReload() {
+		return _reloadRequested
+				|| System.currentTimeMillis() >= _nextReload;
+	}
+
+	/**
+	 * Helper accessor for child processes that provides info to determine if
+	 * they can continue with yet another execution cycle
+	 * 
+	 * @return boolean - true if runtime is not expired and not time yet to
+	 *         reload parameters
+	 */
+	public boolean continueLooping() {
+		return (endNotRequested() && !timeToReload());
+	} // ________________________________
+
+	/**
+	 * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+	 * 
+	 * @param p_oP
+	 *            ConfigTree - look for attributes in this Element only
+	 * @param p_sAtt
+	 *            String - Name of attribute to find
+	 * @param p_sDefault
+	 *            String -default value if requested attribute is not there
+	 * @return String - value of attribute, or default value (if null)
+	 * @throws Exception -
+	 *             If requested attribute not found and no default value
+	 *             supplied by invoker
+	 */
+	public String obtainAtt(ConfigTree p_oP, String p_sAtt, String p_sDefault)
+			throws ConfigurationException {
+		String sVal = p_oP.getAttribute(p_sAtt);
+		if ((null == sVal) && (null == p_sDefault))
+			throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+		return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+
+	private static EPRManager getEprManager()
+	{
+		PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+		String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");	
+		return EPRManager.getInstance(sDir);
+	}
+	
+	public EPR getEprByName (String name) throws IOException
+	{
+		return getEprManager().loadEPR(name);
+	} // ________________________________
+
+	public void register (String name, EPR address)
+	{
+		try { getEprManager().saveEPR(name,address); }
+		catch (IOException e)
+		{
+			_logger.fatal("Cannot register service",e);
+		}
+	} // ________________________________
+
+	public void unRegister (String name)
+	{
+		try { getEprManager().removeEPR(name); }
+		catch (IOException e)
+		{
+			_logger.fatal("Cannot un-register service",e);
+		}
+	} // ________________________________
+
+
+	private 		CommandQueue _commandQueue;
+	private static 	CommandQueue _defaultCommandQueue = null;
+
+	private static Logger	_logger = Logger.getLogger(GatewayListenerController.class);
+	private String 			_sParametersName;
+	private ConfigTree 		_config;
+	private boolean 		_reloadRequested;
+	private boolean			_endRequested;
+	private long 			_nextReload = Long.MAX_VALUE;
+	private long 			_endTime = Long.MAX_VALUE;
+	protected int 			_defaultReloadMillis = 180000; // default interval between parameter reloads
+
+
+	public static final SimpleDateFormat _dateFormat 
+		= new SimpleDateFormat("yyyyMMdd hh:mm:ss");
+
+	private State _status = null;
+
+
+	private HashMap<String, Object> _attributes;
+} // ____________________________________________________________________________

Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java	2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java	2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,232 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import javax.enterprise.deploy.spi.exceptions.ConfigurationException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.naming.Context;
+
+import org.apache.log4j.*;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.helpers.JMSEpr;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.listeners.ListenerTagNames;;
+
+public class JmsGatewayListener implements Runnable
+{
+
+    public JmsGatewayListener(GatewayListenerController commandListener, ConfigTree listenerConfig) 
+    	throws Exception 
+    {
+    	_config		= listenerConfig;
+    	_controller	= commandListener;
+    	_sleepForRetries = 3000;			//  milliseconds
+        checkMyParms();
+    } // __________________________________
+
+	public void run() 
+	{
+		if (null!=_eprInName)
+			_controller.register(_eprInName,_eprIn);
+
+		while (_controller.continueLooping()) 
+        {
+            javax.jms.Message msgIn = receiveOne();
+            if (null!=msgIn)
+            try
+            {
+            	Object obj = _processMethod.invoke(_composer,new Object[] {msgIn} );
+        		if (null==obj)
+        		{
+        			_logger.warn("Action class method <"+_processMethod.getName()+"> returned a null object");
+        			continue;
+        		}
+        		Message message = null;
+        		try { message = (org.jboss.soa.esb.message.Message)obj; }
+        		catch (ClassCastException e)
+        		{
+        			_logger.error("Action class method <"+_processMethod.getName()+"> returned a non Message object");
+        			continue;
+        		}
+            	dispatch(message);
+            	continue;
+            }
+            catch (InvocationTargetException e)	
+            {	
+            	_logger.error("Problems invoking method <"+_processMethod.getName()+">",e);
+            }
+            catch (IllegalAccessException e)	
+            {	
+            	_logger.error("Problems invoking method <"+_processMethod.getName()+">",e);
+            }
+            catch (Exception e)	
+            {
+            	_logger.error("Unexpected problem",e);
+            }
+        }
+        
+		if (null!=_eprInName)
+			_controller.unRegister(_eprInName);
+
+		if (null != _queueSession) 
+            try { _queueSession.close(); }
+            catch (Exception e1) {/* Tried my best - Just continue */ }
+        if (null != _queueConnection)
+            try { _queueConnection.close(); } 
+            catch (Exception e2) {/* Tried my best - Just continue */ }
+    } // ________________________________
+
+	protected void dispatch(Message message)
+	{
+		
+    } // ________________________________
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
+     */
+    protected void checkMyParms() throws Exception 
+    {
+        // Third arg is null - Exception will be thrown if attribute is not found
+    	_eprOutName	= _controller.obtainAtt(_config, ListenerTagNames.EPR_OUT_NAME, null);
+    	_eprOut		= _controller.getEprByName(_eprOutName);
+    	if (null==_eprOut)
+        	throw new ConfigurationException("EPR <"+_eprOutName+"> not found in registry");
+
+    	_queueName 	= _controller.obtainAtt(_config, JMSEpr.DESTINATION_NAME_TAG, null);
+ 
+        // Look for first "action" element - only first one will be used
+        String tagName = ListenerTagNames.ACTION_ELEMENT;
+        ConfigTree actionElement = _config.getFirstChild(tagName);
+        if (null==actionElement)
+        	throw new ConfigurationException("Missing <"+tagName+"> element");
+        // class attribute
+        _composerName	= _controller.obtainAtt(actionElement,ListenerTagNames.ACTION_CLASS_ATTRIBUTE,null);
+        _composerClass = Class.forName(_composerName);
+    	Constructor oConst = _composerClass.getConstructor(new Class[] {ConfigTree.class});
+    	_composer= oConst.newInstance(_config);            	
+
+    	// From here onwards, all attributes have a default value
+    	// process attribute
+    	tagName	= ListenerTagNames.ACTION_PROCESS_METHOD;
+    	String sProcessMethod = _controller.obtainAtt(_config,tagName,tagName);
+    	_processMethod = _composerClass.getMethod(sProcessMethod,new Class[] {Message.class});
+
+        // No problem if selector is null - everything in queue will be returned
+        _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+        
+        prepareMessageReceiver();
+    } // ________________________________
+    
+    private void prepareMessageReceiver() throws Exception
+    {
+        _queueConnection = null;
+        _queueSession = null;
+        _queue = null;
+
+        String sJndiType = _controller.obtainAtt(_config, JMSEpr.JNDI_TYPE_TAG,"jboss");
+        String sJndiURL = _controller.obtainAtt(_config, JMSEpr.JNDI_URL_TAG,"localhost");
+        Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+        String sFactClass = _controller.obtainAtt(_config,JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+    	_eprInName	= _config.getAttribute(ListenerTagNames.EPR_IN_NAME);
+        _eprIn = (null==_eprInName) ? null
+        		: new JMSEpr(JMSEpr.QUEUE_TYPE,_queueName,sFactClass,sJndiType,sJndiURL,_messageSelector);
+        
+        Object tmp = oJndiCtx.lookup(sFactClass);
+        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+        _queueConnection = qcf.createQueueConnection();
+        _queue = (Queue) oJndiCtx.lookup(_queueName);
+        _queueSession = _queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
+        _queueConnection.start();
+
+        _messageReceiver = _queueSession.createReceiver(_queue, _messageSelector);
+
+    } // ________________________________
+
+	/**
+	 * Receive one message and retry if connection
+	 * @return javax.jms.Message - One input message, or null
+	 */
+	protected javax.jms.Message receiveOne() 
+    {
+        while (_controller.endRequested())
+        try 
+        {
+            return _messageReceiver.receive(_controller.millisToWait());
+        }
+        catch (JMSException oJ)
+        {
+            _logger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
+            try { prepareMessageReceiver(); } 
+            // try to reconnect to the queue
+            catch (Exception e)
+            {
+            	_logger.error("Reconnecting to Queue", e);
+                try { Thread.sleep(_sleepForRetries); }
+                catch (InterruptedException e1)
+                { // Just return
+                	_logger.error("Unexpected thread interupt exception.", e);
+                	return null;
+                }
+             }
+        }
+        return null;
+    } //________________________________
+    
+    protected final static Logger _logger = Logger.getLogger(JmsGatewayListener.class);
+
+    protected String			_queueName;
+    protected QueueConnection 	_queueConnection;
+    protected QueueSession 		_queueSession;
+    protected Queue 			_queue;
+    protected MessageConsumer 	_messageReceiver;
+    protected String 			_messageSelector;
+    protected ConfigTree 		_config;
+    protected GatewayListenerController _controller;
+    protected final long 		_sleepForRetries;   //  milliseconds
+
+    protected String			_eprInName	,_eprOutName;
+    protected EPR				_eprIn		,_eprOut;
+
+    protected String			_composerName;
+    protected Class 			_composerClass;
+    protected Object			_composer;
+    protected Method			_processMethod;
+} 




More information about the jboss-svn-commits mailing list