[jboss-svn-commits] JBL Code SVN: r6910 - in labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb: . dispatchers listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Oct 19 09:42:46 EDT 2006
Author: estebanschifman
Date: 2006-10-19 09:42:40 -0400 (Thu, 19 Oct 2006)
New Revision: 6910
Added:
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java
Log:
Classes needed for dispatching
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java 2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/Dispatcher.java 2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.message.Message;
+
+public abstract class Dispatcher
+{
+
+ public abstract boolean dispatch(Message message) throws DispatcherException;
+
+ // no default constructor
+ private Dispatcher() {}
+
+ // protected constructor
+ protected Dispatcher (EPR epr)
+ {
+ _epr = epr;
+ }
+
+
+ EPR _epr;
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java 2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherException.java 2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Dispatch Exception.
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+public class DispatcherException extends BaseException
+{
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Construct an exception instance.
+ * @param message Exception message.
+ */
+ public DispatcherException(String message) { super(message); }
+
+ /**
+ * Construct an exception instance.
+ * @param message Exception message.
+ * @param cause Exception cause.
+ */
+ public DispatcherException(String message, Throwable cause) { super(message, cause); }
+
+ /**
+ * Construct an exception instance.
+ * @param cause Exception cause.
+ */
+ public DispatcherException(Throwable cause) { super(cause); }
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java 2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/DispatcherFactory.java 2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import java.net.URISyntaxException;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.helpers.*;
+
+public class DispatcherFactory
+{
+ // private default constructor
+ private DispatcherFactory() {}
+
+ public static DispatcherFactory getInstance(Object obj)
+ {
+ return _instance;
+ }
+
+ public static Dispatcher getDispatcher(EPR epr) throws DispatcherException
+ {
+ try
+ {
+ String address = epr.getAddr().getAddress();
+ if (address.startsWith(JMSEpr.JMS_PROTOCOL))
+ return new JmsDispatcher(epr);
+ }
+ catch (URISyntaxException e) { throw new DispatcherException(e); }
+ throw new DispatcherException("Unknown protocol");
+ }
+
+ private static final DispatcherFactory _instance = new DispatcherFactory();
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java 2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/dispatchers/JmsDispatcher.java 2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.dispatchers;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.message.Message;
+
+public class JmsDispatcher extends Dispatcher
+{
+ JmsDispatcher(EPR epr)
+ {
+ super(epr);
+ }
+
+ @Override
+ public boolean dispatch(Message message) throws DispatcherException
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+}
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java 2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GatewayListenerController.java 2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,550 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.util.EPRManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+public class GatewayListenerController implements Runnable
+{
+
+ public static void main(String[] args) throws Exception
+ {
+ GatewayListenerController oProc = new GatewayListenerController(args[0]);
+ oProc.run();
+ GatewayListenerController.State oS = oProc.getState();
+
+ if (null != oS.getException()) {
+ _logger.error("GatewayListener <" + args[0] + "> FAILED\n", oS
+ .getException());
+ }
+ System.exit(oS.getCompletionCode());
+ } // ________________________________
+
+ public static final String RELOAD_SECONDS_TAG = "parameterReloadSecs";
+ public static final String END_TIME_TAG = "endTime";
+
+ // Attribute name that denotes listener class to be instantiated in a child thread
+ // This attribute is not in the root node but in first level child ConfigTrees
+ public static final String GATEWAY_CLASS_TAG = "gatewayClass";
+
+ /**
+ * Obtain a shallow copy of needed atributes in this object's last loaded
+ * parameter tree <p/>The local bject is cloned so child threads can use it
+ * as they choose to without interfering with the environment
+ * <p />
+ * Listener processes controlled by this object should keep a reference to
+ * this object at construction time, and not call this method again unless
+ * they specifically need updated values. Parameter reload could have
+ * happened since last call
+ *
+ * @return Map - a shallow copy of the attributes Map
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getControllerAttributes()
+ {
+ return (Map<String, Object>) _attributes.clone();
+ }
+
+ public State getState() { return _status; }
+
+ public static enum State
+ {
+ Loading_parameters, Running, Shutting_down, Done_OK, Exception_thrown;
+ int m_iCompletionCode = 0;
+
+ Exception m_oException = null;
+
+ public int getCompletionCode() {
+ return m_iCompletionCode;
+ };
+
+ public Exception getException() {
+ return m_oException;
+ }
+ };
+
+ private ActionDefinitionFactory actionDefinitionFactory;
+
+ /**
+ * Package pivate default constructor.
+ */
+ protected GatewayListenerController() { }
+
+ /**
+ * Construct a Listener Manager from the named repository based
+ * configuration.
+ *
+ * @param p_sParameterName
+ * Name of the Repository entry containing the configuration.
+ * @throws Exception
+ * Unable to load/use the named configuration.
+ */
+ public GatewayListenerController(String p_sParameterName) throws Exception {
+ this(GatewayListenerController.getListenerConfig(p_sParameterName));
+ _sParametersName = p_sParameterName;
+ }
+
+ /**
+ * Construct a Listener Manager using the specified listener configuration.
+ *
+ * @param config
+ * The configuration.
+ * @throws Exception
+ * Unable to load/use the supplied configuration.
+ */
+ public GatewayListenerController(ConfigTree config) throws Exception {
+ _config = config;
+ _status = State.Loading_parameters;
+
+ try { checkParms(_config); }
+ catch (Exception e)
+ {
+ String configSource = config.getAttribute("configSource");
+ _status = State.Exception_thrown;
+ _status.m_oException = e;
+ _logger.fatal("Listener configuration and startup error. Config Source: "
+ + (configSource != null ? configSource
+ : "unknown"), e);
+ throw e;
+ }
+ }
+
+ /**
+ * Load the named listener configuration from the configured parameter
+ * repository.
+ *
+ * @param reposParam
+ * The name of the repository entry containing the Listener
+ * configuration.
+ * @return Listener Configuration as {@link ConfigTree}.
+ * @throws IOException
+ * Unable to access the repository.
+ * @throws ParamRepositoryException
+ * Unable to access the configuration in the repository.
+ * @throws SAXException
+ * Unable to parse the configuration.
+ */
+ private static ConfigTree getListenerConfig(String reposParam)
+ throws IOException, ParamRepositoryException, SAXException {
+ String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+ ConfigTree config = ConfigTree.fromXml(sXml);
+
+ config.setAttribute("configSource", "param-repository:" + reposParam);
+
+ return config;
+ }
+
+ /**
+ * Check to see if all needed parameters are there, and assign default
+ * values to some of them
+ *
+ * @param p_oP
+ * ConfigTree - Where to look for the mandatory/optional
+ * configuration attributes
+ * @throws Exception -
+ * If attributes are wrong or not enough for a proper runtime
+ * configuration
+ */
+ public void checkParms(ConfigTree p_oP) throws Exception {
+ // We've just loaded - set to false until next reload requested
+ _reloadRequested = false;
+ _commandQueue = createCommandQueue(p_oP);
+
+ // Open the command queue...
+ if (null!=_commandQueue)
+ _commandQueue.open(DomElement.fromConfigTree(p_oP));
+
+ // if RELOAD_SECONDS_TAG not set, and no command queue
+ // then reload every 10 minutes
+ // If there is a command queue, run until command is received
+ String sRldSecs = p_oP.getAttribute(RELOAD_SECONDS_TAG);
+ _nextReload = (null != sRldSecs)
+ ? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
+ : (null == _commandQueue)
+ ? Long.MAX_VALUE
+ : System.currentTimeMillis() + _defaultReloadMillis;
+
+ // if END_TIME_TAG not set try to run forever
+ // not a good practice if command queue is not set
+ // Expected date format is "yyyyMMdd hh:mm:ss"
+ String sEndT = p_oP.getAttribute(END_TIME_TAG);
+ _endTime = (null == sEndT) ? Long.MAX_VALUE : _dateFormat.parse(
+ sEndT).getTime();
+
+ // Read and initialise the action definitions...
+ ConfigTree actionConfig = p_oP.getFirstChild("Actions");
+ if(actionConfig == null) {
+ throw new ConfigurationException("No 'Actions' configuration.");
+ }
+ actionDefinitionFactory = new ActionDefinitionFactory(DomElement.fromConfigTree(actionConfig));
+
+ } // ________________________________
+
+ /**
+ * Factory method for creating the command queue.
+ * @param config GatewayListener config.
+ * @return GatewayListener CommandQueue instance.
+ */
+ private CommandQueue createCommandQueue(ConfigTree config) {
+ String commandQueueClass = config.getAttribute("command-queue-class");
+
+ if(commandQueueClass != null) {
+ try {
+ return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+ } catch (Exception e) {
+ _logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
+ }
+ }
+
+ return _defaultCommandQueue;
+ }
+
+ /**
+ * Allows a default command queue to be set statically for all GatewayListener instances.
+ * @param defaultCommandQueue The defaultCommandQueue to set.
+ */
+ public static void setDefaultCommandQueue(CommandQueue defaultCommandQueue) {
+ GatewayListenerController._defaultCommandQueue = defaultCommandQueue;
+ }
+
+ /**
+ * Main execution loop <p/> Will continue to run until either <p/>a) run
+ * time is expired <p/>b) quiesce command is received in command queue
+ * <p/>For every child element that contains a PARM_LISTENER_CLASS
+ * attribute, this method will try to launch a child thread instantiating an
+ * object of that class, and will call it's run() method <p/>Once all child
+ * processes are trigered, the main thread will either <p/>1) wait for a
+ * message in the command queue (if one was configured) until next reload or
+ * end of run period expired <p/>or 2) Just sleep if there's no command
+ * queue to listen on
+ */
+ public void run()
+ {
+ while (endNotRequested())
+ {
+ _status = State.Running;
+ for (ConfigTree oCurr : _config.getAllChildren()) {
+ String sClass = oCurr.getAttribute(GATEWAY_CLASS_TAG);
+ if (Util.isNullString(sClass))
+ continue;
+ tryToLaunchGateway(oCurr, sClass);
+ }
+
+ waitForCmdOrSleep();
+
+ if (endRequested()) {
+ break;
+ }
+ if (_sParametersName != null && timeToReload())
+ {
+ try
+ {
+ _status = State.Loading_parameters;
+ _logger
+ .info("Reloading parameters _____________________________________________________");
+ ConfigTree oNew = GatewayListenerController.getListenerConfig(_sParametersName);
+ checkParms(oNew);
+ _config = oNew;
+ } catch (Exception e) {
+ _logger.error("Failed to reload parameters"
+ + " - Continuing with cached version", e);
+ }
+ }
+ }
+ // _status = State.Shutting_down;
+
+ _status = State.Done_OK;
+ _status.m_iCompletionCode = 0;
+ _logger
+ .info("Finishing_____________________________________________________");
+
+ // Close the command queue...
+ try {
+ _commandQueue.close();
+ } catch (CommandQueueException e) {
+ _logger.error("Error closing Command Queue.", e);
+ }
+ } // ________________________________
+
+ private void tryToLaunchGateway(ConfigTree p_oP, String p_sClassName)
+ {
+ try {
+ Class oListener = Class.forName(p_sClassName);
+ Constructor oConst = oListener.getConstructor(new Class[] {
+ this.getClass(), ConfigTree.class, ActionDefinitionFactory.class });
+ Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
+ p_oP, actionDefinitionFactory });
+ new Thread(oRun).start();
+ } catch (Exception e) {
+ _logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ }
+ } // ________________________________
+
+ long millisToWait() {
+ return Math.min(_nextReload, _endTime) - System.currentTimeMillis();
+ } // ________________________________
+
+ private void waitForCmdOrSleep() {
+ long lToGo = millisToWait();
+
+ if (null == _commandQueue) {
+ _logger.debug("About to sleep " + lToGo);
+ // No command queue nor topic - Just sleep until time
+ // exhausted, or thread interrupted
+ try {
+ if (lToGo > 0)
+ Thread.sleep(lToGo);
+ } catch (InterruptedException e) {
+ _endTime = 0; // mark as end requested and return
+ }
+ return;
+ }
+
+ // Wait for commands until time exhausted or command received
+ // Note that received commands might change time variables (reload/end)
+ // that's why time to go is recalculated on each cycle
+ while ((lToGo = millisToWait()) > 0) {
+ try {
+ _logger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+ String oM = _commandQueue.receiveCommand(lToGo);
+ if (null == oM) {
+ return;
+ }
+ processCommand(oM);
+ if (endRequested() || timeToReload()) {
+ break;
+ }
+ } catch (CommandQueueException eJ) {
+ _logger.info("receive on command queue failed", eJ);
+ }
+ }
+ } // ________________________________
+
+ /**
+ * Processes the command that has been received in the command queue (or
+ * topic) <p/>_endRequested, _reloadRequested, and _endTime could be
+ * changed
+ *
+ * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+ * width="400"/> </COLGROUP>
+ * <TR>
+ * <TD align="center">message text</TD>
+ * <TD align="center">effect</TD>
+ * </TR>
+ * <TR>
+ * <TD>shutdown*</TD>
+ * <TD>End time will be immediately set to 'now' - quiesce process will
+ * start - Child threads will be allowed to finish normally</TD>
+ * </TR>
+ * <TR>
+ * <TD>reload param*</TD>
+ * <TD>Parameters will be immediately reloaded, and listener reconfigured
+ * with new values</TD>
+ * </TR>
+ * <TR>
+ * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+ * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+ * end of day assumed (23:59:59)</TD>
+ * </TR>
+ * </TABLE> * startsWith() <p/>
+ *
+ * @param p_oMsg
+ * Message received from the command queue.
+ *
+ */
+ private void processCommand(String sTxt) {
+ if (null == sTxt)
+ return;
+
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown")) {
+ _endRequested = true;
+ _logger.info("Shutdown has been requested");
+ return;
+ }
+ if (sLow.startsWith("reload param")) {
+ _reloadRequested = true;
+ _logger
+ .info("Request for parameter reload has been received");
+ return;
+ }
+ String[] sa = sLow.split("\\s+");
+ if (sa.length > 1 && "endtime".equals(sa[0])) {
+ try {
+ String sDate = sa[1];
+ String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+ : sa[2];
+ Date oEnd = _dateFormat.parse(sDate + " " + sTime);
+ _logger.info("New end date set to : " + oEnd);
+ _endTime = oEnd.getTime();
+ } catch (Exception eDat) {
+ _logger.info("Problems with endTime command", eDat);
+ }
+ }
+ } // ________________________________
+
+ /**
+ * Accessor to determine if execution time is expired or shutdown requested
+ *
+ * @return boolean if processing has to stop (all child threads will be
+ * allowed to finish)
+ */
+ public boolean endRequested() {
+ return _endRequested || System.currentTimeMillis() >= _endTime;
+ }
+
+ /**
+ * Accessor to determine if execution time is not expired, and no shutdown
+ * request received
+ *
+ * @return boolean - true if run time has not expired and quiesce has not
+ * been requested
+ */
+ public boolean endNotRequested() {
+ return !endRequested();
+ }
+
+ /**
+ * Provide a common accessor to determine if parameters have to be reloaded
+ * <p/> For child threads this means thread execution has to end
+ * </p>
+ * Child processes should only call this method when they are idle (as
+ * opposed to in the middle of executing a unit of work)
+ *
+ * @return boolean - true if it's time to reload parameters
+ */
+ public boolean timeToReload() {
+ return _reloadRequested
+ || System.currentTimeMillis() >= _nextReload;
+ }
+
+ /**
+ * Helper accessor for child processes that provides info to determine if
+ * they can continue with yet another execution cycle
+ *
+ * @return boolean - true if runtime is not expired and not time yet to
+ * reload parameters
+ */
+ public boolean continueLooping() {
+ return (endNotRequested() && !timeToReload());
+ } // ________________________________
+
+ /**
+ * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+ *
+ * @param p_oP
+ * ConfigTree - look for attributes in this Element only
+ * @param p_sAtt
+ * String - Name of attribute to find
+ * @param p_sDefault
+ * String -default value if requested attribute is not there
+ * @return String - value of attribute, or default value (if null)
+ * @throws Exception -
+ * If requested attribute not found and no default value
+ * supplied by invoker
+ */
+ public String obtainAtt(ConfigTree p_oP, String p_sAtt, String p_sDefault)
+ throws ConfigurationException {
+ String sVal = p_oP.getAttribute(p_sAtt);
+ if ((null == sVal) && (null == p_sDefault))
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+
+ private static EPRManager getEprManager()
+ {
+ PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+ String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");
+ return EPRManager.getInstance(sDir);
+ }
+
+ public EPR getEprByName (String name) throws IOException
+ {
+ return getEprManager().loadEPR(name);
+ } // ________________________________
+
+ public void register (String name, EPR address)
+ {
+ try { getEprManager().saveEPR(name,address); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot register service",e);
+ }
+ } // ________________________________
+
+ public void unRegister (String name)
+ {
+ try { getEprManager().removeEPR(name); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot un-register service",e);
+ }
+ } // ________________________________
+
+
+ private CommandQueue _commandQueue;
+ private static CommandQueue _defaultCommandQueue = null;
+
+ private static Logger _logger = Logger.getLogger(GatewayListenerController.class);
+ private String _sParametersName;
+ private ConfigTree _config;
+ private boolean _reloadRequested;
+ private boolean _endRequested;
+ private long _nextReload = Long.MAX_VALUE;
+ private long _endTime = Long.MAX_VALUE;
+ protected int _defaultReloadMillis = 180000; // default interval between parameter reloads
+
+
+ public static final SimpleDateFormat _dateFormat
+ = new SimpleDateFormat("yyyyMMdd hh:mm:ss");
+
+ private State _status = null;
+
+
+ private HashMap<String, Object> _attributes;
+} // ____________________________________________________________________________
Added: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java 2006-10-19 12:58:18 UTC (rev 6909)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsGatewayListener.java 2006-10-19 13:42:40 UTC (rev 6910)
@@ -0,0 +1,232 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import javax.enterprise.deploy.spi.exceptions.ConfigurationException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.naming.Context;
+
+import org.apache.log4j.*;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.helpers.JMSEpr;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.listeners.ListenerTagNames;;
+
+public class JmsGatewayListener implements Runnable
+{
+
+ public JmsGatewayListener(GatewayListenerController commandListener, ConfigTree listenerConfig)
+ throws Exception
+ {
+ _config = listenerConfig;
+ _controller = commandListener;
+ _sleepForRetries = 3000; // milliseconds
+ checkMyParms();
+ } // __________________________________
+
+ public void run()
+ {
+ if (null!=_eprInName)
+ _controller.register(_eprInName,_eprIn);
+
+ while (_controller.continueLooping())
+ {
+ javax.jms.Message msgIn = receiveOne();
+ if (null!=msgIn)
+ try
+ {
+ Object obj = _processMethod.invoke(_composer,new Object[] {msgIn} );
+ if (null==obj)
+ {
+ _logger.warn("Action class method <"+_processMethod.getName()+"> returned a null object");
+ continue;
+ }
+ Message message = null;
+ try { message = (org.jboss.soa.esb.message.Message)obj; }
+ catch (ClassCastException e)
+ {
+ _logger.error("Action class method <"+_processMethod.getName()+"> returned a non Message object");
+ continue;
+ }
+ dispatch(message);
+ continue;
+ }
+ catch (InvocationTargetException e)
+ {
+ _logger.error("Problems invoking method <"+_processMethod.getName()+">",e);
+ }
+ catch (IllegalAccessException e)
+ {
+ _logger.error("Problems invoking method <"+_processMethod.getName()+">",e);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected problem",e);
+ }
+ }
+
+ if (null!=_eprInName)
+ _controller.unRegister(_eprInName);
+
+ if (null != _queueSession)
+ try { _queueSession.close(); }
+ catch (Exception e1) {/* Tried my best - Just continue */ }
+ if (null != _queueConnection)
+ try { _queueConnection.close(); }
+ catch (Exception e2) {/* Tried my best - Just continue */ }
+ } // ________________________________
+
+ protected void dispatch(Message message)
+ {
+
+ } // ________________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws Exception
+ {
+ // Third arg is null - Exception will be thrown if attribute is not found
+ _eprOutName = _controller.obtainAtt(_config, ListenerTagNames.EPR_OUT_NAME, null);
+ _eprOut = _controller.getEprByName(_eprOutName);
+ if (null==_eprOut)
+ throw new ConfigurationException("EPR <"+_eprOutName+"> not found in registry");
+
+ _queueName = _controller.obtainAtt(_config, JMSEpr.DESTINATION_NAME_TAG, null);
+
+ // Look for first "action" element - only first one will be used
+ String tagName = ListenerTagNames.ACTION_ELEMENT;
+ ConfigTree actionElement = _config.getFirstChild(tagName);
+ if (null==actionElement)
+ throw new ConfigurationException("Missing <"+tagName+"> element");
+ // class attribute
+ _composerName = _controller.obtainAtt(actionElement,ListenerTagNames.ACTION_CLASS_ATTRIBUTE,null);
+ _composerClass = Class.forName(_composerName);
+ Constructor oConst = _composerClass.getConstructor(new Class[] {ConfigTree.class});
+ _composer= oConst.newInstance(_config);
+
+ // From here onwards, all attributes have a default value
+ // process attribute
+ tagName = ListenerTagNames.ACTION_PROCESS_METHOD;
+ String sProcessMethod = _controller.obtainAtt(_config,tagName,tagName);
+ _processMethod = _composerClass.getMethod(sProcessMethod,new Class[] {Message.class});
+
+ // No problem if selector is null - everything in queue will be returned
+ _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+
+ prepareMessageReceiver();
+ } // ________________________________
+
+ private void prepareMessageReceiver() throws Exception
+ {
+ _queueConnection = null;
+ _queueSession = null;
+ _queue = null;
+
+ String sJndiType = _controller.obtainAtt(_config, JMSEpr.JNDI_TYPE_TAG,"jboss");
+ String sJndiURL = _controller.obtainAtt(_config, JMSEpr.JNDI_URL_TAG,"localhost");
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+ String sFactClass = _controller.obtainAtt(_config,JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ _eprInName = _config.getAttribute(ListenerTagNames.EPR_IN_NAME);
+ _eprIn = (null==_eprInName) ? null
+ : new JMSEpr(JMSEpr.QUEUE_TYPE,_queueName,sFactClass,sJndiType,sJndiURL,_messageSelector);
+
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ _queueConnection = qcf.createQueueConnection();
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ _queueSession = _queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
+ _queueConnection.start();
+
+ _messageReceiver = _queueSession.createReceiver(_queue, _messageSelector);
+
+ } // ________________________________
+
+ /**
+ * Receive one message and retry if connection
+ * @return javax.jms.Message - One input message, or null
+ */
+ protected javax.jms.Message receiveOne()
+ {
+ while (_controller.endRequested())
+ try
+ {
+ return _messageReceiver.receive(_controller.millisToWait());
+ }
+ catch (JMSException oJ)
+ {
+ _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
+ try { prepareMessageReceiver(); }
+ // try to reconnect to the queue
+ catch (Exception e)
+ {
+ _logger.error("Reconnecting to Queue", e);
+ try { Thread.sleep(_sleepForRetries); }
+ catch (InterruptedException e1)
+ { // Just return
+ _logger.error("Unexpected thread interupt exception.", e);
+ return null;
+ }
+ }
+ }
+ return null;
+ } //________________________________
+
+ protected final static Logger _logger = Logger.getLogger(JmsGatewayListener.class);
+
+ protected String _queueName;
+ protected QueueConnection _queueConnection;
+ protected QueueSession _queueSession;
+ protected Queue _queue;
+ protected MessageConsumer _messageReceiver;
+ protected String _messageSelector;
+ protected ConfigTree _config;
+ protected GatewayListenerController _controller;
+ protected final long _sleepForRetries; // milliseconds
+
+ protected String _eprInName ,_eprOutName;
+ protected EPR _eprIn ,_eprOut;
+
+ protected String _composerName;
+ protected Class _composerClass;
+ protected Object _composer;
+ protected Method _processMethod;
+}
More information about the jboss-svn-commits
mailing list