[jboss-svn-commits] JBL Code SVN: r8301 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss: . internal internal/soa internal/soa/esb internal/soa/esb/listeners soa/esb/listeners soa/esb/listeners/message

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Dec 13 14:59:40 EST 2006


Author: estebanschifman
Date: 2006-12-13 14:59:34 -0500 (Wed, 13 Dec 2006)
New Revision: 8301

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
New ListenerManager and implementation classes (unified controller for Gateways and ESB aware listeners)

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java	2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java	2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,636 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.internal.soa.esb.listeners;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.util.EPRManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.State;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+/**
+ * Default implementation of the {@link org.jboss.soa.esb.listeners.ListenerManager} interface.
+ * <p/>
+ * Controlling class that will launch listener child threads for supported
+ * transport listener classes, as indicated in the configuration XML tree used in the constructor
+ * If you use the 'main' method, configuration file is expected in arg[0]
+ * 
+ * <p />
+ * Can be launched as uppermost controller (it has a main(args) method)
+ * <p />
+ * Also implements Runnable, and can thus be launched in a child thread from an
+ * upper controlling process
+ * <p />
+ * Listens on a JMS queue (with an optional message selector) for commands (e.g.
+ * Quiesce, Reload Parameters, Set End Time, etc.)
+ * <p />
+ * Parameter reloading can also be set using the PARM_RELOAD_SECS attribute
+ * <p />
+ * End time for this instance can also be set using the PARM_END_TIME attribute
+ * <p />
+ * Constructor will not return until the controller is up and running completely, with
+ * all managed listeners fully initalised.
+ * 
+ * @since Version 4.0
+ */
+
+public class DefaultListenerManager  implements ListenerManager
+{
+		/**
+		 * Construct a Listener Manager and configure it using the named repository entry
+		 * 
+		 * @param parametersName
+		 *            Name of the Repository entry containing the configuration.
+		 * @throws Exception
+		 *             Unable to load/use the named configuration.
+		 */
+		public DefaultListenerManager(String parametersName) throws Exception
+		{		
+			this(DefaultListenerManager.getListenerConfig(parametersName));
+			_parametersName = parametersName;
+		} //____________________________
+	
+		/**
+		 * Construct a Listener Controller using the specified listener configuration.
+		 * 
+		 * @param config
+		 *            The configuration.
+		 * @throws Exception
+		 *             Unable to load/use the supplied configuration.
+		 */
+		public DefaultListenerManager(ConfigTree config) throws Exception 
+		{
+			_pauseTimeMillis = 50;
+			_config = config;
+			_status = State.Loading_parameters;
+	
+			try {	checkParms(_config); }
+			catch (Exception e) 
+			{
+				String configSource = config.getAttribute("configSource");
+	
+				_status = State.Exception_thrown;
+				_status.setThrowable(e);
+				_logger.fatal("Listener configuration and startup error.  Config Source: "
+										+ (configSource != null ? configSource
+												: "unknown"), e);
+	
+				throw e;
+			}
+		} //____________________________
+	
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#getState()
+		 */
+		/**
+		 * Load the named listener configuration from the configured parameter
+		 * repository.
+		 * 
+		 * @param reposParam
+		 *            The name of the repository entry containing the Listener
+		 *            configuration.
+		 * @return Listener Configuration as {@link ConfigTree}.
+		 * @throws IOException
+		 *             Unable to access the repository.
+		 * @throws ParamRepositoryException
+		 *             Unable to access the configuration in the repository.
+		 * @throws SAXException
+		 *             Unable to parse the configuration.
+		 */
+		private static ConfigTree getListenerConfig(String reposParam)
+				throws IOException, ParamRepositoryException, SAXException {		
+			String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+			ConfigTree config = ConfigTree.fromXml(sXml);
+	
+			config.setAttribute("configSource", "param-repository:" + reposParam);
+	
+			return config;
+		}
+	
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#checkParms(org.jboss.soa.esb.helpers.ConfigTree)
+		 */
+		public void checkParms(ConfigTree p_oP) throws Exception 
+		{
+			// We've just loaded - set to false until next reload requested
+			_reloadRequested = false;
+			if (null!=_parametersName)
+			{
+				File file = new File(_parametersName);
+				if (file.exists())
+					_paramFileTimeStamps.put(_parametersName, file.lastModified());
+			}
+			
+
+			_commandQueue = createCommandQueue(p_oP);
+	
+			// Open the command queue...
+			if (null!=_commandQueue)
+				_commandQueue.open(p_oP);
+	
+			setNextReloadTime(p_oP);
+
+			// if PARM_END_TIME not set try to run forever
+			// not a good practice if command queue is not set
+			// Expected date format is "yyyyMMdd hh:mm:ss"
+			String sEndT = p_oP.getAttribute(PARM_END_TIME);
+			_endTimeStamp = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
+					sEndT).getTime();
+	          
+		} // ________________________________
+	
+		private void setNextReloadTime(ConfigTree tree)
+		{
+			// if RELOAD_SECONDS_TAG not set, and no command queue
+			// then reload every 10 minutes
+			// If there is a command queue, run until command is received
+
+			_sRldSecs = tree.getAttribute(PARM_RELOAD_SECS);
+			synchronized (_synchReload) 
+			{
+				_nextReload = (null != _sRldSecs) 
+				? System.currentTimeMillis() + 1000 * Long.parseLong(_sRldSecs)
+				: (null == _commandQueue) 
+						? Long.MAX_VALUE 
+						: System.currentTimeMillis() + _defaultReloadIntervalMillis;
+			}
+			
+			
+		    if (null==_sRldSecs)
+		    {
+		    	String sMsg = (null==_commandQueue)
+		    		? " -  Using default of "+_sRldSecs
+		    		: " - Listener will run until stopped by command sent to queue";
+		    	_logger.warn("No value specified for: "+PARM_RELOAD_SECS + sMsg);
+		    }
+
+		} //____________________________
+
+	    /**
+	     * Factory method for creating the command queue.
+	     * @param config DefaultListenerManager config.
+	     * @return DefaultListenerManager CommandQueue instance.
+	     */
+		private CommandQueue createCommandQueue(ConfigTree config) 
+		{
+			String commandQueueClass = config.getAttribute("command-queue-class");
+			if(commandQueueClass != null) 
+			{
+				try { return (CommandQueue) Class.forName(commandQueueClass).newInstance(); }
+				catch (Exception e) 
+				{
+					_logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "].  Defaulting to no Command Queue", e);
+				}
+			}
+				
+			return null;
+		} //____________________________
+	
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#run()
+		 */
+		public void run() 
+		{
+			boolean relaunch = true;
+			while (endNotRequested()) 
+			{
+				_status = State.Running;
+				if (relaunch)
+				{
+					synchronized (_synchAllReady)
+					{
+						_childrenStarted = new HashMap<Class, Boolean>();
+						for (ConfigTree oCurr : _config.getAllChildren()) 
+						{
+							String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
+							if (Util.isNullString(sClass))
+								continue;
+							tryToLaunchChildListener(oCurr, sClass);
+						}
+					}
+					waitUntilReady();
+				}
+				_status = State.Ready;
+	
+				waitForCmdOrSleep();
+	
+				if (endRequested())
+				{
+					break;
+				}
+
+				relaunch = false;
+				if (_parametersName != null && isReloadNeeded()) 
+					try 
+					{
+						_status = State.Loading_parameters;
+						_logger.debug("Reloading parameters _____________________________________________________");
+						ConfigTree oNew = DefaultListenerManager.getListenerConfig(_parametersName);
+						checkParms(oNew);
+						_config = oNew;
+						relaunch = true;
+					} 
+					catch (Exception e) 
+					{
+						_logger.error("Failed to reload parameters"
+								+ " - Continuing with cached version", e);
+					}
+			}
+			// m_oState = State.Shutting_down;
+	
+			_status = State.Done_OK;
+			_status.setCompletionCode(0);
+			_logger.debug("Finishing_____________________________________________________");
+	
+			// Close the command queue...
+			try {
+				if (null != _commandQueue)
+					_commandQueue.close();
+			} catch (CommandQueueException e) {
+				_logger.error("Error closing Command Queue.", e);
+			}
+		} // ________________________________
+	
+		private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName) 
+		{
+			try 
+			{
+				Class oListener = Class.forName(p_sClassName);
+				Constructor oConst = oListener.getConstructor(new Class[] {ListenerManager.class, ConfigTree.class});
+				Runnable listener = (Runnable)oConst.newInstance(new Object[] { this,p_oP});
+
+				// if you wish to monitor that child listener is ready, make it extend Observer
+				// and update observers with a Boolean.TRUE when it's ready
+				if (listener instanceof Observable)
+				{
+					_childrenStarted.put(listener.getClass(), Boolean.FALSE);
+					((Observable)listener).addObserver(this);
+				}
+				
+				new Thread(listener).start();
+			}
+			catch (Throwable e) 
+			{
+				_logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+			}
+		} // ________________________________
+	
+		public long millisToWait()
+		{
+			synchronized (_synchReload)
+			{
+				return Math.min(_nextReload, _endTimeStamp) - System.currentTimeMillis();
+			}
+		} // ________________________________
+	
+		private void waitForCmdOrSleep() {
+			long lToGo = millisToWait();
+	
+			if (null == _commandQueue) {
+				_logger.debug("About to sleep " + lToGo);
+				// No command queue nor topic - Just sleep until time
+				// exhausted, or thread interrupted
+				try {
+					while ((lToGo=millisToWait()) > 0)
+						Thread.sleep(500);
+				} catch (InterruptedException e) {
+					_endTimeStamp = 0; // mark as end requested and return
+				}
+				return;
+			}
+	
+			// Wait for commands until time exhausted or command received
+			// Note that received commands might change time variables (reload/end)
+			// that's why time to go is recalculated on each cycle
+			while ((lToGo = millisToWait()) > 0) {
+				try {
+					_logger.info("Waiting for command ... timeout=" + lToGo + " millis");
+	
+					String oM = _commandQueue.receiveCommand(lToGo);
+					if (null == oM) {
+						return;
+					}
+					processCommand(oM);
+					if (endRequested() || isReloadNeeded()) {
+						break;
+					}
+				} catch (CommandQueueException eJ) {
+					_logger.info("receive on command queue failed", eJ);
+				}
+			}
+		} // ________________________________
+	
+		/**
+		 * Processes the command that has been received in the command queue (or
+		 * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
+		 * changed
+		 * 
+		 * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+		 * width="400"/> </COLGROUP>
+		 * <TR>
+		 * <TD align="center">message text</TD>
+		 * <TD align="center">effect</TD>
+		 * </TR>
+		 * <TR>
+		 * <TD>shutdown*</TD>
+		 * <TD>End time will be immediately set to 'now' - quiesce process will
+		 * start - Child threads will be allowed to finish normally</TD>
+		 * </TR>
+		 * <TR>
+		 * <TD>reload param*</TD>
+		 * <TD>Parameters will be immediately reloaded, and listener reconfigured
+		 * with new values</TD>
+		 * </TR>
+		 * <TR>
+		 * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+		 * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+		 * end of day assumed (23:59:59)</TD>
+		 * </TR>
+		 * </TABLE> * startsWith() <p/>
+		 * 
+		 * @param p_oMsg
+		 *            Message received from the command queue.
+		 * 
+		 */
+		private void processCommand(String sTxt) 
+		{
+			if (null == sTxt)
+				return;
+			
+			String sLow = sTxt.trim().toLowerCase();
+			if (sLow.startsWith("shutdown")) {
+				_endRequested = true;
+				_logger.info("Shutdown has been requested");
+				return;
+			}
+			if (sLow.startsWith("reload param")) {
+				_reloadRequested = true;
+				_logger
+						.info("Request for parameter reload has been received");
+				return;
+			}
+			String[] sa = sLow.split("\\s+");
+			if (sa.length > 1 && "endtime".equals(sa[0])) {
+				try {
+					String sDate = sa[1];
+					String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+							: sa[2];
+					Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+					_logger.info("New end date set to : " + oEnd);
+					_endTimeStamp = oEnd.getTime();
+				} catch (Exception eDat) {
+					_logger.info("Problems with endTime command", eDat);
+				}
+			}
+		} // ________________________________
+	
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#requestEnd()
+		 */
+		public void requestEnd() 
+		{
+			_endRequested=true;
+			_endTimeStamp = 0;
+		}
+		
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endRequested()
+		 */
+		public boolean endRequested() 
+		{
+			return _endRequested || System.currentTimeMillis() >= _endTimeStamp;
+		}
+	
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endNotRequested()
+		 */
+		public boolean endNotRequested() 
+		{
+			return !endRequested();
+		}
+	
+		public boolean isReloadNeeded()
+		{
+			// command to reload was received - force reload
+			if (_reloadRequested)
+				return refreshNextReload();
+
+			// Still not time to reload
+			if (System.currentTimeMillis() < _nextReload)
+				return false;
+			
+			if (null==_parametersName)
+				return refreshNextReload();
+			
+			File paramFile = new File(_parametersName);
+			if (! paramFile.exists())
+				return refreshNextReload();
+			
+//			check the TS on the file.
+			Long previousTimeStamp	= _paramFileTimeStamps.get(_parametersName);
+			if (null==previousTimeStamp)
+				return refreshNextReload();
+
+
+			Long currentTimeStamp	= paramFile.lastModified();
+			if (! previousTimeStamp.equals(currentTimeStamp))
+				return refreshNextReload();
+
+			setNextReloadTime(_config);
+			return false;
+		}
+		
+		private boolean refreshNextReload()
+		{
+			setNextReloadTime(_config);
+			return true;
+		}
+
+	
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#continueLooping()
+		 */
+		public boolean continueLooping() 
+		{
+			return (endNotRequested() && !isReloadNeeded());
+		} // ________________________________
+		
+		public static EPRManager getEprManager()
+		{
+			PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+			String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");	
+			return EPRManager.getInstance(sDir);
+		}
+		
+		@Deprecated
+		private void register (String name, EPR address)
+		{
+			try { getEprManager().saveEPR(name,address); }
+			catch (IOException e)
+			{
+				_logger.fatal("Cannot register service",e);
+			}
+		} // ________________________________
+		@Deprecated
+		private void unRegister (String name)
+		{
+			try { getEprManager().removeEPR(name); }
+			catch (IOException e)
+			{
+				_logger.fatal("Cannot un-register service",e);
+			}
+		} // ________________________________
+
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.ListenerManager#register(org.jboss.soa.esb.helpers.ConfigTree, org.jboss.soa.esb.addressing.EPR)
+		 */
+		public void register(ConfigTree config , EPR epr) throws RegistryException
+		{
+			String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+			String serviceName         = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+			if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+			{
+					register(serviceName,epr);
+					return;
+			}
+			String serviceDescription  = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
+			String eprDescription      = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
+			Registry registry = RegistryFactory.getRegistry();
+			synchronized (_registrySynch)
+			{
+				registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
+			}
+		} //____________________________
+		/* (non-Javadoc)
+		 * @see org.jboss.soa.esb.listeners.message.ListenerManager#unRegister(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
+		 */
+		public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
+		{
+			if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+			{
+					unRegister(serviceName);
+					return;
+			}
+			Registry registry = RegistryFactory.getRegistry();
+			synchronized (_registrySynch)
+			{
+				registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
+			}
+		} //____________________________
+	
+		public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException 
+		{
+			return null;
+		} //____________________________
+
+		public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException 
+		{
+			return null;
+		} //____________________________
+	
+		// this method will typically run in the invoker's thread 
+		// (which btw might be the same as the run() thread if this not launched in a separate thread
+		public void waitUntilReady() 
+		{
+			boolean someChildPending = true;
+			do
+			{
+				Collection<Boolean> allStarted = null;
+				synchronized (_synchAllReady)
+				{
+					allStarted = (null==_childrenStarted) ? null :  _childrenStarted.values(); 
+				}
+				if (null!= allStarted)
+				{
+					someChildPending = false;
+					for (Boolean curr : allStarted)
+						if (! Boolean.TRUE.equals(curr))
+							someChildPending = true;
+				}
+				if (someChildPending)
+					try { Thread.sleep(_pauseTimeMillis); }
+					catch (InterruptedException e) {	return; }
+					
+			} while (someChildPending);
+		} //____________________________
+		
+		// Child processes must let us know when they're ready
+		public void update(Observable o, Object arg) 
+		{
+			if (null!=_childrenStarted && (arg instanceof Boolean))
+				_childrenStarted.put(o.getClass(), (Boolean)arg);
+		} //____________________________
+
+	
+		private static	Logger 	_logger = Logger.getLogger(DefaultListenerManager.class);
+		private ConfigTree		_config;
+		private CommandQueue 	_commandQueue;
+
+		private State 			_status = State.Uninitialised;
+		public State getState() { return _status; }
+		
+		private Map<Class,Boolean> _childrenStarted;
+		
+		
+		private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
+		private String 			_sRldSecs=null;
+		private String 			_parametersName;
+
+		private boolean 		_reloadRequested, _endRequested;	
+		private long 			_nextReload 	= Long.MAX_VALUE;
+		private long 			_endTimeStamp 	= Long.MAX_VALUE;
+	
+		private Object 			_registrySynch 	= new Short((short)0);
+		private Object 			_synchReload 	= new Short((short)10);
+		private Object			_synchAllReady 	= new Short((short)20);
+		// default interval between parameter reloads = 6 minutes
+		protected int 			_defaultReloadIntervalMillis = 180000 ;
+		
+		protected int			_pauseTimeMillis;
+
+	} // ____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java	2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java	2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Observer;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.services.registry.RegistryException;
+
+/**
+ * ListenerManager interface.
+ * <p/> An implementation of this intervace controls the lifecycle of a set of child listener processes  
+ */
+public interface ListenerManager extends Runnable, Observer 
+{
+
+	public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
+	public static final String PARM_END_TIME 	= "endTime";
+
+	// Attribute names to denote listener class to be instantiated in a child thread
+	// This attribute is not in the root node but in first level child elements
+	public static final String PARM_GATEWAY_CLASS	= "gatewayClass";
+	public static final String PARM_LISTENER_CLASS	= "listenerClass";
+	public static final SimpleDateFormat s_oDateParse  = new SimpleDateFormat("yyyyMMdd hh:mm:ss");
+	
+	// Will return when all controlled processes are up and running
+	public void waitUntilReady() throws Exception;
+	// Method to request an orderly shutdown of the managing process and it's children
+	public abstract void requestEnd();
+	// Child threads will invoke this method to continue or end their own process
+	public abstract boolean continueLooping();
+	// This manager determines maximum time that child processes can wait for an event
+	public abstract long millisToWait();
+	
+	/**
+	 * Register an EPR in the registry.
+	 * 
+	 * @param config - a config tree containing the deployment-configuration of the service.
+	 * @param epr - the epr (EndPoint Reference) of the service.
+	 * 
+	 * @throws RegistryException
+	 */
+	public abstract void register(ConfigTree config, EPR epr)
+			throws RegistryException;
+
+	/**
+	 * Unregister the EPR from the registry.
+	 * 
+	 * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+	 * @param serviceName         - name of the service ("
+	 * @param epr
+	 * @throws RegistryException
+	 */
+	public abstract void unRegister(String serviceCategoryName, String serviceName, EPR epr) throws RegistryException;
+
+	/**
+	 * Obtain an EPR under requested category and name
+	 * 
+	 * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+	 * @param serviceName         - name of the service ("
+	 * @throws RegistryException
+	 */
+	public abstract EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException;
+
+	/**
+	 * Obtain a list of EPRs under requested category and name
+	 * 
+	 * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+	 * @param serviceName         - name of the service ("
+	 * @throws RegistryException
+	 */
+	public abstract List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException;
+
+}
\ No newline at end of file

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java	2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManagerFactory.java	2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,17 @@
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.internal.soa.esb.listeners.DefaultListenerManager;
+
+public class ListenerManagerFactory 
+{
+	private static final ListenerManagerFactory _instance = new ListenerManagerFactory();
+	private ListenerManagerFactory() {}
+	
+	public static ListenerManagerFactory getInstance() {return _instance; }
+	
+	public ListenerManager getListenerManager(String parametersName)
+		throws Exception
+	{
+		return new DefaultListenerManager(parametersName);
+	}
+}

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-13 19:59:34 UTC (rev 8301)
@@ -0,0 +1,42 @@
+package org.jboss.soa.esb.listeners;
+
+import org.apache.log4j.Logger;
+
+public class ListenerUtil 
+{
+
+	private ListenerUtil() {}
+	
+	/**
+	 * ListenerManager Launcher.
+	 * @param parametersName String - used to retrieve the ConfigTree with run time configuration values
+	 * @param inNewThread boolean - if 'true' will launch the ListenerManager in a new thread, and wait 
+	 * until controlled listeners are up and running
+	 * <br/> if false, will instantiate and run the Controller in current thread - flow control will only
+	 * return to invoker once the controller's run() method has finished
+	 * @return
+	 * @throws Exception
+	 */
+	public static ListenerManager launchManager(String parametersName, boolean inNewThread)
+		throws Exception
+	{
+		ListenerManager manager = ListenerManagerFactory.getInstance().getListenerManager(parametersName);
+		if (inNewThread)
+		{
+			_logger.debug(Thread.currentThread()+ " Waiting for all child listeners to start");
+			manager.waitUntilReady();
+			_logger.debug(Thread.currentThread()+ " All child listeners ready");
+		}
+		else
+		{
+			StringBuilder sb = new StringBuilder("You are running a ListenerManager in your own thread")
+				.append(" - It won't return until it's run() method ends")
+			;
+			_logger.warn(sb.toString());
+			manager.run();
+		}
+		return manager;
+	} //________________________________
+	
+	private static final Logger _logger = Logger.getLogger(ListenerUtil.class);
+}

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-13 19:33:47 UTC (rev 8300)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-13 19:59:34 UTC (rev 8301)
@@ -50,7 +50,7 @@
  * @since Version 4.0
  */
 
-public class MessageAwareListener implements Runnable, Observer
+public class MessageAwareListener extends Observable implements Runnable, Observer
 {
 	
 	/**
@@ -69,7 +69,19 @@
     	
     	checkMyParms();
     } // _______________________________
-    
+
+    /**
+     * Wait until the registration process finished, a PickupCourier was obtained
+     * , and the pool thread is instantiated
+     */
+	public void waitUntilReadyToPickup() 
+	{
+		while(null==_execService) 
+			try { Thread.sleep(50); }
+			catch (InterruptedException e) 
+				{ _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
+	} //________________________________
+
     // Child threads will send a -1  when their run() method ends
     // we need to prevent picking up Messages when there are no available threads in pool
 	public void update(Observable o, Object arg) 
@@ -106,6 +118,9 @@
 			_execService = Executors.newFixedThreadPool(_maxThreads);
 			_qRunningThreads = 0;
 			
+			setChanged();
+			notifyObservers(Boolean.TRUE);
+			
 	    	while (_controller.continueLooping())
 	        {
 	    		// Check to see if there are available threads in pool
@@ -152,8 +167,10 @@
 	
 	        	}
 	        }
-	    	
+	    	setChanged();
+	    	notifyObservers(Boolean.FALSE);	    	
 		}
+		
     	try { unregisterProcess(); }
 	   	catch (Exception re) 
 	   	{
@@ -230,15 +247,7 @@
     	return (null != sVal) ? sVal : p_sDefault;
 	} // ________________________________
     
-	public void waitUntilReadyToPickup() 
-	{
-		while(null==_execService) 
-			try { Thread.sleep(50); }
-			catch (InterruptedException e) 
-				{ _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
-	} //________________________________
 
-
 	protected ConfigTree 			_config = null;
 	protected EsbListenerController _controller = null;
 




More information about the jboss-svn-commits mailing list