[jboss-svn-commits] JBL Code SVN: r8301 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss: . internal internal/soa internal/soa/esb internal/soa/esb/listeners soa/esb/listeners soa/esb/listeners/message
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Dec 13 14:59:40 EST 2006
Author: estebanschifman
Date: 2006-12-13 14:59:34 -0500 (Wed, 13 Dec 2006)
New Revision: 8301
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
New ListenerManager and implementation classes (unified controller for Gateways and ESB aware listeners)
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java 2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java 2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,636 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.internal.soa.esb.listeners;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.util.EPRManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.State;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+/**
+ * Default implementation of the {@link org.jboss.soa.esb.listeners.ListenerManager} interface.
+ * <p/>
+ * Controlling class that will launch listener child threads for supported
+ * transport listener classes, as indicated in the configuration XML tree used in the constructor
+ * If you use the 'main' method, configuration file is expected in arg[0]
+ *
+ * <p />
+ * Can be launched as uppermost controller (it has a main(args) method)
+ * <p />
+ * Also implements Runnable, and can thus be launched in a child thread from an
+ * upper controlling process
+ * <p />
+ * Listens on a JMS queue (with an optional message selector) for commands (e.g.
+ * Quiesce, Reload Parameters, Set End Time, etc.)
+ * <p />
+ * Parameter reloading can also be set using the PARM_RELOAD_SECS attribute
+ * <p />
+ * End time for this instance can also be set using the PARM_END_TIME attribute
+ * <p />
+ * Constructor will not return until the controller is up and running completely, with
+ * all managed listeners fully initalised.
+ *
+ * @since Version 4.0
+ */
+
+public class DefaultListenerManager implements ListenerManager
+{
+ /**
+ * Construct a Listener Manager and configure it using the named repository entry
+ *
+ * @param parametersName
+ * Name of the Repository entry containing the configuration.
+ * @throws Exception
+ * Unable to load/use the named configuration.
+ */
+ public DefaultListenerManager(String parametersName) throws Exception
+ {
+ this(DefaultListenerManager.getListenerConfig(parametersName));
+ _parametersName = parametersName;
+ } //____________________________
+
+ /**
+ * Construct a Listener Controller using the specified listener configuration.
+ *
+ * @param config
+ * The configuration.
+ * @throws Exception
+ * Unable to load/use the supplied configuration.
+ */
+ public DefaultListenerManager(ConfigTree config) throws Exception
+ {
+ _pauseTimeMillis = 50;
+ _config = config;
+ _status = State.Loading_parameters;
+
+ try { checkParms(_config); }
+ catch (Exception e)
+ {
+ String configSource = config.getAttribute("configSource");
+
+ _status = State.Exception_thrown;
+ _status.setThrowable(e);
+ _logger.fatal("Listener configuration and startup error. Config Source: "
+ + (configSource != null ? configSource
+ : "unknown"), e);
+
+ throw e;
+ }
+ } //____________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#getState()
+ */
+ /**
+ * Load the named listener configuration from the configured parameter
+ * repository.
+ *
+ * @param reposParam
+ * The name of the repository entry containing the Listener
+ * configuration.
+ * @return Listener Configuration as {@link ConfigTree}.
+ * @throws IOException
+ * Unable to access the repository.
+ * @throws ParamRepositoryException
+ * Unable to access the configuration in the repository.
+ * @throws SAXException
+ * Unable to parse the configuration.
+ */
+ private static ConfigTree getListenerConfig(String reposParam)
+ throws IOException, ParamRepositoryException, SAXException {
+ String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+ ConfigTree config = ConfigTree.fromXml(sXml);
+
+ config.setAttribute("configSource", "param-repository:" + reposParam);
+
+ return config;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#checkParms(org.jboss.soa.esb.helpers.ConfigTree)
+ */
+ public void checkParms(ConfigTree p_oP) throws Exception
+ {
+ // We've just loaded - set to false until next reload requested
+ _reloadRequested = false;
+ if (null!=_parametersName)
+ {
+ File file = new File(_parametersName);
+ if (file.exists())
+ _paramFileTimeStamps.put(_parametersName, file.lastModified());
+ }
+
+
+ _commandQueue = createCommandQueue(p_oP);
+
+ // Open the command queue...
+ if (null!=_commandQueue)
+ _commandQueue.open(p_oP);
+
+ setNextReloadTime(p_oP);
+
+ // if PARM_END_TIME not set try to run forever
+ // not a good practice if command queue is not set
+ // Expected date format is "yyyyMMdd hh:mm:ss"
+ String sEndT = p_oP.getAttribute(PARM_END_TIME);
+ _endTimeStamp = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
+ sEndT).getTime();
+
+ } // ________________________________
+
+ private void setNextReloadTime(ConfigTree tree)
+ {
+ // if RELOAD_SECONDS_TAG not set, and no command queue
+ // then reload every 10 minutes
+ // If there is a command queue, run until command is received
+
+ _sRldSecs = tree.getAttribute(PARM_RELOAD_SECS);
+ synchronized (_synchReload)
+ {
+ _nextReload = (null != _sRldSecs)
+ ? System.currentTimeMillis() + 1000 * Long.parseLong(_sRldSecs)
+ : (null == _commandQueue)
+ ? Long.MAX_VALUE
+ : System.currentTimeMillis() + _defaultReloadIntervalMillis;
+ }
+
+
+ if (null==_sRldSecs)
+ {
+ String sMsg = (null==_commandQueue)
+ ? " - Using default of "+_sRldSecs
+ : " - Listener will run until stopped by command sent to queue";
+ _logger.warn("No value specified for: "+PARM_RELOAD_SECS + sMsg);
+ }
+
+ } //____________________________
+
+ /**
+ * Factory method for creating the command queue.
+ * @param config DefaultListenerManager config.
+ * @return DefaultListenerManager CommandQueue instance.
+ */
+ private CommandQueue createCommandQueue(ConfigTree config)
+ {
+ String commandQueueClass = config.getAttribute("command-queue-class");
+ if(commandQueueClass != null)
+ {
+ try { return (CommandQueue) Class.forName(commandQueueClass).newInstance(); }
+ catch (Exception e)
+ {
+ _logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
+ }
+ }
+
+ return null;
+ } //____________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#run()
+ */
+ public void run()
+ {
+ boolean relaunch = true;
+ while (endNotRequested())
+ {
+ _status = State.Running;
+ if (relaunch)
+ {
+ synchronized (_synchAllReady)
+ {
+ _childrenStarted = new HashMap<Class, Boolean>();
+ for (ConfigTree oCurr : _config.getAllChildren())
+ {
+ String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
+ if (Util.isNullString(sClass))
+ continue;
+ tryToLaunchChildListener(oCurr, sClass);
+ }
+ }
+ waitUntilReady();
+ }
+ _status = State.Ready;
+
+ waitForCmdOrSleep();
+
+ if (endRequested())
+ {
+ break;
+ }
+
+ relaunch = false;
+ if (_parametersName != null && isReloadNeeded())
+ try
+ {
+ _status = State.Loading_parameters;
+ _logger.debug("Reloading parameters _____________________________________________________");
+ ConfigTree oNew = DefaultListenerManager.getListenerConfig(_parametersName);
+ checkParms(oNew);
+ _config = oNew;
+ relaunch = true;
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to reload parameters"
+ + " - Continuing with cached version", e);
+ }
+ }
+ // m_oState = State.Shutting_down;
+
+ _status = State.Done_OK;
+ _status.setCompletionCode(0);
+ _logger.debug("Finishing_____________________________________________________");
+
+ // Close the command queue...
+ try {
+ if (null != _commandQueue)
+ _commandQueue.close();
+ } catch (CommandQueueException e) {
+ _logger.error("Error closing Command Queue.", e);
+ }
+ } // ________________________________
+
+ private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName)
+ {
+ try
+ {
+ Class oListener = Class.forName(p_sClassName);
+ Constructor oConst = oListener.getConstructor(new Class[] {ListenerManager.class, ConfigTree.class});
+ Runnable listener = (Runnable)oConst.newInstance(new Object[] { this,p_oP});
+
+ // if you wish to monitor that child listener is ready, make it extend Observer
+ // and update observers with a Boolean.TRUE when it's ready
+ if (listener instanceof Observable)
+ {
+ _childrenStarted.put(listener.getClass(), Boolean.FALSE);
+ ((Observable)listener).addObserver(this);
+ }
+
+ new Thread(listener).start();
+ }
+ catch (Throwable e)
+ {
+ _logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ }
+ } // ________________________________
+
+ public long millisToWait()
+ {
+ synchronized (_synchReload)
+ {
+ return Math.min(_nextReload, _endTimeStamp) - System.currentTimeMillis();
+ }
+ } // ________________________________
+
+ private void waitForCmdOrSleep() {
+ long lToGo = millisToWait();
+
+ if (null == _commandQueue) {
+ _logger.debug("About to sleep " + lToGo);
+ // No command queue nor topic - Just sleep until time
+ // exhausted, or thread interrupted
+ try {
+ while ((lToGo=millisToWait()) > 0)
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ _endTimeStamp = 0; // mark as end requested and return
+ }
+ return;
+ }
+
+ // Wait for commands until time exhausted or command received
+ // Note that received commands might change time variables (reload/end)
+ // that's why time to go is recalculated on each cycle
+ while ((lToGo = millisToWait()) > 0) {
+ try {
+ _logger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+ String oM = _commandQueue.receiveCommand(lToGo);
+ if (null == oM) {
+ return;
+ }
+ processCommand(oM);
+ if (endRequested() || isReloadNeeded()) {
+ break;
+ }
+ } catch (CommandQueueException eJ) {
+ _logger.info("receive on command queue failed", eJ);
+ }
+ }
+ } // ________________________________
+
+ /**
+ * Processes the command that has been received in the command queue (or
+ * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
+ * changed
+ *
+ * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+ * width="400"/> </COLGROUP>
+ * <TR>
+ * <TD align="center">message text</TD>
+ * <TD align="center">effect</TD>
+ * </TR>
+ * <TR>
+ * <TD>shutdown*</TD>
+ * <TD>End time will be immediately set to 'now' - quiesce process will
+ * start - Child threads will be allowed to finish normally</TD>
+ * </TR>
+ * <TR>
+ * <TD>reload param*</TD>
+ * <TD>Parameters will be immediately reloaded, and listener reconfigured
+ * with new values</TD>
+ * </TR>
+ * <TR>
+ * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+ * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+ * end of day assumed (23:59:59)</TD>
+ * </TR>
+ * </TABLE> * startsWith() <p/>
+ *
+ * @param p_oMsg
+ * Message received from the command queue.
+ *
+ */
+ private void processCommand(String sTxt)
+ {
+ if (null == sTxt)
+ return;
+
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown")) {
+ _endRequested = true;
+ _logger.info("Shutdown has been requested");
+ return;
+ }
+ if (sLow.startsWith("reload param")) {
+ _reloadRequested = true;
+ _logger
+ .info("Request for parameter reload has been received");
+ return;
+ }
+ String[] sa = sLow.split("\\s+");
+ if (sa.length > 1 && "endtime".equals(sa[0])) {
+ try {
+ String sDate = sa[1];
+ String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+ : sa[2];
+ Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+ _logger.info("New end date set to : " + oEnd);
+ _endTimeStamp = oEnd.getTime();
+ } catch (Exception eDat) {
+ _logger.info("Problems with endTime command", eDat);
+ }
+ }
+ } // ________________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#requestEnd()
+ */
+ public void requestEnd()
+ {
+ _endRequested=true;
+ _endTimeStamp = 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endRequested()
+ */
+ public boolean endRequested()
+ {
+ return _endRequested || System.currentTimeMillis() >= _endTimeStamp;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endNotRequested()
+ */
+ public boolean endNotRequested()
+ {
+ return !endRequested();
+ }
+
+ public boolean isReloadNeeded()
+ {
+ // command to reload was received - force reload
+ if (_reloadRequested)
+ return refreshNextReload();
+
+ // Still not time to reload
+ if (System.currentTimeMillis() < _nextReload)
+ return false;
+
+ if (null==_parametersName)
+ return refreshNextReload();
+
+ File paramFile = new File(_parametersName);
+ if (! paramFile.exists())
+ return refreshNextReload();
+
+// check the TS on the file.
+ Long previousTimeStamp = _paramFileTimeStamps.get(_parametersName);
+ if (null==previousTimeStamp)
+ return refreshNextReload();
+
+
+ Long currentTimeStamp = paramFile.lastModified();
+ if (! previousTimeStamp.equals(currentTimeStamp))
+ return refreshNextReload();
+
+ setNextReloadTime(_config);
+ return false;
+ }
+
+ private boolean refreshNextReload()
+ {
+ setNextReloadTime(_config);
+ return true;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#continueLooping()
+ */
+ public boolean continueLooping()
+ {
+ return (endNotRequested() && !isReloadNeeded());
+ } // ________________________________
+
+ public static EPRManager getEprManager()
+ {
+ PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+ String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");
+ return EPRManager.getInstance(sDir);
+ }
+
+ @Deprecated
+ private void register (String name, EPR address)
+ {
+ try { getEprManager().saveEPR(name,address); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot register service",e);
+ }
+ } // ________________________________
+ @Deprecated
+ private void unRegister (String name)
+ {
+ try { getEprManager().removeEPR(name); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot un-register service",e);
+ }
+ } // ________________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.ListenerManager#register(org.jboss.soa.esb.helpers.ConfigTree, org.jboss.soa.esb.addressing.EPR)
+ */
+ public void register(ConfigTree config , EPR epr) throws RegistryException
+ {
+ String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ String serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+ {
+ register(serviceName,epr);
+ return;
+ }
+ String serviceDescription = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
+ String eprDescription = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
+ Registry registry = RegistryFactory.getRegistry();
+ synchronized (_registrySynch)
+ {
+ registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
+ }
+ } //____________________________
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.ListenerManager#unRegister(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
+ */
+ public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
+ {
+ if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+ {
+ unRegister(serviceName);
+ return;
+ }
+ Registry registry = RegistryFactory.getRegistry();
+ synchronized (_registrySynch)
+ {
+ registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
+ }
+ } //____________________________
+
+ public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException
+ {
+ return null;
+ } //____________________________
+
+ public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException
+ {
+ return null;
+ } //____________________________
+
+ // this method will typically run in the invoker's thread
+ // (which btw might be the same as the run() thread if this not launched in a separate thread
+ public void waitUntilReady()
+ {
+ boolean someChildPending = true;
+ do
+ {
+ Collection<Boolean> allStarted = null;
+ synchronized (_synchAllReady)
+ {
+ allStarted = (null==_childrenStarted) ? null : _childrenStarted.values();
+ }
+ if (null!= allStarted)
+ {
+ someChildPending = false;
+ for (Boolean curr : allStarted)
+ if (! Boolean.TRUE.equals(curr))
+ someChildPending = true;
+ }
+ if (someChildPending)
+ try { Thread.sleep(_pauseTimeMillis); }
+ catch (InterruptedException e) { return; }
+
+ } while (someChildPending);
+ } //____________________________
+
+ // Child processes must let us know when they're ready
+ public void update(Observable o, Object arg)
+ {
+ if (null!=_childrenStarted && (arg instanceof Boolean))
+ _childrenStarted.put(o.getClass(), (Boolean)arg);
+ } //____________________________
+
+
+ private static Logger _logger = Logger.getLogger(DefaultListenerManager.class);
+ private ConfigTree _config;
+ private CommandQueue _commandQueue;
+
+ private State _status = State.Uninitialised;
+ public State getState() { return _status; }
+
+ private Map<Class,Boolean> _childrenStarted;
+
+
+ private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
+ private String _sRldSecs=null;
+ private String _parametersName;
+
+ private boolean _reloadRequested, _endRequested;
+ private long _nextReload = Long.MAX_VALUE;
+ private long _endTimeStamp = Long.MAX_VALUE;
+
+ private Object _registrySynch = new Short((short)0);
+ private Object _synchReload = new Short((short)10);
+ private Object _synchAllReady = new Short((short)20);
+ // default interval between parameter reloads = 6 minutes
+ protected int _defaultReloadIntervalMillis = 180000 ;
+
+ protected int _pauseTimeMillis;
+
+ } // ____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java 2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java 2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Observer;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.services.registry.RegistryException;
+
+/**
+ * ListenerManager interface.
+ * <p/> An implementation of this intervace controls the lifecycle of a set of child listener processes
+ */
+public interface ListenerManager extends Runnable, Observer
+{
+
+ public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
+ public static final String PARM_END_TIME = "endTime";
+
+ // Attribute names to denote listener class to be instantiated in a child thread
+ // This attribute is not in the root node but in first level child elements
+ public static final String PARM_GATEWAY_CLASS = "gatewayClass";
+ public static final String PARM_LISTENER_CLASS = "listenerClass";
+ public static final SimpleDateFormat s_oDateParse = new SimpleDateFormat("yyyyMMdd hh:mm:ss");
+
+ // Will return when all controlled processes are up and running
+ public void waitUntilReady() throws Exception;
+ // Method to request an orderly shutdown of the managing process and it's children
+ public abstract void requestEnd();
+ // Child threads will invoke this method to continue or end their own process
+ public abstract boolean continueLooping();
+ // This manager determines maximum time that child processes can wait for an event
+ public abstract long millisToWait();
+
+ /**
+ * Register an EPR in the registry.
+ *
+ * @param config - a config tree containing the deployment-configuration of the service.
+ * @param epr - the epr (EndPoint Reference) of the service.
+ *
+ * @throws RegistryException
+ */
+ public abstract void register(ConfigTree config, EPR epr)
+ throws RegistryException;
+
+ /**
+ * Unregister the EPR from the registry.
+ *
+ * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+ * @param serviceName - name of the service ("
+ * @param epr
+ * @throws RegistryException
+ */
+ public abstract void unRegister(String serviceCategoryName, String serviceName, EPR epr) throws RegistryException;
+
+ /**
+ * Obtain an EPR under requested category and name
+ *
+ * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+ * @param serviceName - name of the service ("
+ * @throws RegistryException
+ */
+ public abstract EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException;
+
+ /**
+ * Obtain a list of EPRs under requested category and name
+ *
+ * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+ * @param serviceName - name of the service ("
+ * @throws RegistryException
+ */
+ public abstract List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException;
+
+}
\ No newline at end of file
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java 2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java 2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,17 @@
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.internal.soa.esb.listeners.DefaultListenerManager;
+
+public class ListenerManagerFactory
+{
+ private static final ListenerManagerFactory _instance = new ListenerManagerFactory();
+ private ListenerManagerFactory() {}
+
+ public static ListenerManagerFactory getInstance() {return _instance; }
+
+ public ListenerManager getListenerManager(String parametersName)
+ throws Exception
+ {
+ return new DefaultListenerManager(parametersName);
+ }
+}
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,42 @@
+package org.jboss.soa.esb.listeners;
+
+import org.apache.log4j.Logger;
+
+public class ListenerUtil
+{
+
+ private ListenerUtil() {}
+
+ /**
+ * ListenerManager Launcher.
+ * @param parametersName String - used to retrieve the ConfigTree with run time configuration values
+ * @param inNewThread boolean - if 'true' will launch the ListenerManager in a new thread, and wait
+ * until controlled listeners are up and running
+ * <br/> if false, will instantiate and run the Controller in current thread - flow control will only
+ * return to invoker once the controller's run() method has finished
+ * @return
+ * @throws Exception
+ */
+ public static ListenerManager launchManager(String parametersName, boolean inNewThread)
+ throws Exception
+ {
+ ListenerManager manager = ListenerManagerFactory.getInstance().getListenerManager(parametersName);
+ if (inNewThread)
+ {
+ _logger.debug(Thread.currentThread()+ " Waiting for all child listeners to start");
+ manager.waitUntilReady();
+ _logger.debug(Thread.currentThread()+ " All child listeners ready");
+ }
+ else
+ {
+ StringBuilder sb = new StringBuilder("You are running a ListenerManager in your own thread")
+ .append(" - It won't return until it's run() method ends")
+ ;
+ _logger.warn(sb.toString());
+ manager.run();
+ }
+ return manager;
+ } //________________________________
+
+ private static final Logger _logger = Logger.getLogger(ListenerUtil.class);
+}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-13 19:59:34 UTC (rev 8301)
@@ -50,7 +50,7 @@
* @since Version 4.0
*/
-public class MessageAwareListener implements Runnable, Observer
+public class MessageAwareListener extends Observable implements Runnable, Observer
{
/**
@@ -69,7 +69,19 @@
checkMyParms();
} // _______________________________
-
+
+ /**
+ * Wait until the registration process finished, a PickupCourier was obtained
+ * , and the pool thread is instantiated
+ */
+ public void waitUntilReadyToPickup()
+ {
+ while(null==_execService)
+ try { Thread.sleep(50); }
+ catch (InterruptedException e)
+ { _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
+ } //________________________________
+
// Child threads will send a -1 when their run() method ends
// we need to prevent picking up Messages when there are no available threads in pool
public void update(Observable o, Object arg)
@@ -106,6 +118,9 @@
_execService = Executors.newFixedThreadPool(_maxThreads);
_qRunningThreads = 0;
+ setChanged();
+ notifyObservers(Boolean.TRUE);
+
while (_controller.continueLooping())
{
// Check to see if there are available threads in pool
@@ -152,8 +167,10 @@
}
}
-
+ setChanged();
+ notifyObservers(Boolean.FALSE);
}
+
try { unregisterProcess(); }
catch (Exception re)
{
@@ -230,15 +247,7 @@
return (null != sVal) ? sVal : p_sDefault;
} // ________________________________
- public void waitUntilReadyToPickup()
- {
- while(null==_execService)
- try { Thread.sleep(50); }
- catch (InterruptedException e)
- { _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
- } //________________________________
-
protected ConfigTree _config = null;
protected EsbListenerController _controller = null;
More information about the jboss-svn-commits
mailing list