[jboss-svn-commits] JBL Code SVN: r8326 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss: internal/soa/esb/listeners soa/esb/listeners soa/esb/listeners/message

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Dec 14 13:49:48 EST 2006


Author: estebanschifman
Date: 2006-12-14 13:49:41 -0500 (Thu, 14 Dec 2006)
New Revision: 8326

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Work on new managed listeners and Listenermanager

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java	2006-12-14 18:47:51 UTC (rev 8325)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java	2006-12-14 18:49:41 UTC (rev 8326)
@@ -35,11 +35,13 @@
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.command.CommandQueue;
 import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.util.EPRManager;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;
 import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.AbstractManagedListener;
 import org.jboss.soa.esb.listeners.ListenerManager;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.State;
@@ -81,556 +83,583 @@
 
 public class DefaultListenerManager  implements ListenerManager
 {
-		/**
-		 * Construct a Listener Manager and configure it using the named repository entry
-		 * 
-		 * @param parametersName
-		 *            Name of the Repository entry containing the configuration.
-		 * @throws Exception
-		 *             Unable to load/use the named configuration.
-		 */
-		public DefaultListenerManager(String parametersName) throws Exception
-		{		
-			this(DefaultListenerManager.getListenerConfig(parametersName));
-			_parametersName = parametersName;
-		} //____________________________
-	
-		/**
-		 * Construct a Listener Controller using the specified listener configuration.
-		 * 
-		 * @param config
-		 *            The configuration.
-		 * @throws Exception
-		 *             Unable to load/use the supplied configuration.
-		 */
-		public DefaultListenerManager(ConfigTree config) throws Exception 
+	/**
+	 * Construct a Listener Manager and configure it using the named repository entry
+	 * 
+	 * @param parametersName
+	 *            Name of the Repository entry containing the configuration.
+	 * @throws Exception
+	 *             Unable to load/use the named configuration.
+	 */
+	public DefaultListenerManager(String parametersName) throws Exception
+	{		
+		this(DefaultListenerManager.getListenerConfig(parametersName));
+		_parametersName = parametersName;
+	} //____________________________
+
+	/**
+	 * Construct a Listener Controller using the specified listener configuration.
+	 * 
+	 * @param config
+	 *            The configuration.
+	 * @throws Exception
+	 *             Unable to load/use the supplied configuration.
+	 */
+	public DefaultListenerManager(ConfigTree config) throws Exception 
+	{
+		// default sleep time when waiting for events to happen in other threads
+		_pauseTimeMillis = 50;
+		// default interval between parameter reloads = 6 minutes
+		_defaultReloadIntervalMillis = 180000;
+
+		_config = config;
+		_status = State.Loading_parameters;
+
+		try {	checkParms(_config); }
+		catch (Exception e) 
 		{
-			_pauseTimeMillis = 50;
-			_config = config;
-			_status = State.Loading_parameters;
-	
-			try {	checkParms(_config); }
-			catch (Exception e) 
-			{
-				String configSource = config.getAttribute("configSource");
-	
-				_status = State.Exception_thrown;
-				_status.setThrowable(e);
-				_logger.fatal("Listener configuration and startup error.  Config Source: "
-										+ (configSource != null ? configSource
-												: "unknown"), e);
-	
-				throw e;
-			}
-		} //____________________________
-	
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#getState()
-		 */
-		/**
-		 * Load the named listener configuration from the configured parameter
-		 * repository.
-		 * 
-		 * @param reposParam
-		 *            The name of the repository entry containing the Listener
-		 *            configuration.
-		 * @return Listener Configuration as {@link ConfigTree}.
-		 * @throws IOException
-		 *             Unable to access the repository.
-		 * @throws ParamRepositoryException
-		 *             Unable to access the configuration in the repository.
-		 * @throws SAXException
-		 *             Unable to parse the configuration.
-		 */
-		private static ConfigTree getListenerConfig(String reposParam)
-				throws IOException, ParamRepositoryException, SAXException {		
-			String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
-			ConfigTree config = ConfigTree.fromXml(sXml);
-	
-			config.setAttribute("configSource", "param-repository:" + reposParam);
-	
-			return config;
+			String configSource = config.getAttribute("configSource");
+
+			_status = State.Exception_thrown;
+			_status.setThrowable(e);
+			_logger.fatal("Listener configuration and startup error.  Config Source: "
+									+ (configSource != null ? configSource
+											: "unknown"), e);
+
+			throw e;
 		}
-	
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#checkParms(org.jboss.soa.esb.helpers.ConfigTree)
-		 */
-		public void checkParms(ConfigTree p_oP) throws Exception 
+	} //____________________________
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#getState()
+	 */
+	/**
+	 * Load the named listener configuration from the configured parameter
+	 * repository.
+	 * 
+	 * @param reposParam
+	 *            The name of the repository entry containing the Listener
+	 *            configuration.
+	 * @return Listener Configuration as {@link ConfigTree}.
+	 * @throws IOException
+	 *             Unable to access the repository.
+	 * @throws ParamRepositoryException
+	 *             Unable to access the configuration in the repository.
+	 * @throws SAXException
+	 *             Unable to parse the configuration.
+	 */
+	private static ConfigTree getListenerConfig(String reposParam)
+			throws IOException, ParamRepositoryException, SAXException 
+	{		
+		String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+		ConfigTree config = ConfigTree.fromXml(sXml);
+
+		config.setAttribute("configSource", "param-repository:" + reposParam);
+
+		return config;
+	} //____________________________
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#checkParms(org.jboss.soa.esb.helpers.ConfigTree)
+	 */
+	public void checkParms(ConfigTree p_oP) throws Exception 
+	{
+		// We've just loaded - set to false until next reload requested
+		_reloadRequested = false;
+		if (null!=_parametersName)
 		{
-			// We've just loaded - set to false until next reload requested
-			_reloadRequested = false;
-			if (null!=_parametersName)
-			{
-				File file = new File(_parametersName);
-				if (file.exists())
-					_paramFileTimeStamps.put(_parametersName, file.lastModified());
-			}
-			
+			File file = new File(_parametersName);
+			if (file.exists())
+				_paramFileTimeStamps.put(_parametersName, file.lastModified());
+		}
+		
 
-			_commandQueue = createCommandQueue(p_oP);
-	
-			// Open the command queue...
-			if (null!=_commandQueue)
-				_commandQueue.open(p_oP);
-	
-			setNextReloadTime(p_oP);
+		_commandQueue = createCommandQueue(p_oP);
 
-			// if PARM_END_TIME not set try to run forever
-			// not a good practice if command queue is not set
-			// Expected date format is "yyyyMMdd hh:mm:ss"
-			String sEndT = p_oP.getAttribute(PARM_END_TIME);
-			_endTimeStamp = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
-					sEndT).getTime();
-	          
-		} // ________________________________
-	
-		private void setNextReloadTime(ConfigTree tree)
+		// Open the command queue...
+		if (null!=_commandQueue)
+			_commandQueue.open(p_oP);
+
+		setNextReloadTime(p_oP);
+
+		// if PARM_END_TIME not set try to run forever
+		// not a good practice if command queue is not set
+		// Expected date format is "yyyyMMdd hh:mm:ss"
+		String sEndT = p_oP.getAttribute(PARM_END_TIME);
+		_endTimeStamp = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
+				sEndT).getTime();
+		
+		for (ConfigTree tree : _config.getAllChildren())
 		{
-			// if RELOAD_SECONDS_TAG not set, and no command queue
-			// then reload every 10 minutes
-			// If there is a command queue, run until command is received
+			String gateway	= tree.getAttribute(PARM_GATEWAY_CLASS);
+			String listener = tree.getAttribute(PARM_LISTENER_CLASS);
+			if (null==gateway || null==listener)
+				continue;
+			StringBuilder sb = new StringBuilder()
+				.append("Either "+PARM_GATEWAY_CLASS+" or "+PARM_LISTENER_CLASS)
+				.append(" can be specified, but not both")
+			;
+			_logger.error("Child listener class must be gateway or listener - can't be both");
+			throw new ConfigurationException(sb.toString());
+		}
+          
+	} // ________________________________
 
-			_sRldSecs = tree.getAttribute(PARM_RELOAD_SECS);
-			synchronized (_synchReload) 
-			{
-				_nextReload = (null != _sRldSecs) 
-				? System.currentTimeMillis() + 1000 * Long.parseLong(_sRldSecs)
-				: (null == _commandQueue) 
-						? Long.MAX_VALUE 
-						: System.currentTimeMillis() + _defaultReloadIntervalMillis;
-			}
-			
-			
-		    if (null==_sRldSecs)
-		    {
-		    	String sMsg = (null==_commandQueue)
-		    		? " -  Using default of "+_sRldSecs
-		    		: " - Listener will run until stopped by command sent to queue";
-		    	_logger.warn("No value specified for: "+PARM_RELOAD_SECS + sMsg);
-		    }
+	private void setNextReloadTime(ConfigTree tree)
+	{
+		// if RELOAD_SECONDS_TAG not set, and no command queue
+		// then reload every 10 minutes
+		// If there is a command queue, run until command is received
 
-		} //____________________________
+		_sRldSecs = tree.getAttribute(PARM_RELOAD_SECS);
+		synchronized (_synchReload) 
+		{
+			_nextReload = (null != _sRldSecs) 
+			? System.currentTimeMillis() + 1000 * Long.parseLong(_sRldSecs)
+			: (null == _commandQueue) 
+					? Long.MAX_VALUE 
+					: System.currentTimeMillis() + _defaultReloadIntervalMillis;
+		}
+		
+		
+	    if (null==_sRldSecs)
+	    {
+	    	String sMsg = (null==_commandQueue)
+	    		? " -  Using default of "+_sRldSecs
+	    		: " - Listener will run until stopped by command sent to queue";
+	    	_logger.warn("No value specified for: "+PARM_RELOAD_SECS + sMsg);
+	    }
 
-	    /**
-	     * Factory method for creating the command queue.
-	     * @param config DefaultListenerManager config.
-	     * @return DefaultListenerManager CommandQueue instance.
-	     */
-		private CommandQueue createCommandQueue(ConfigTree config) 
+	} //____________________________
+
+    /**
+     * Factory method for creating the command queue.
+     * @param config DefaultListenerManager config.
+     * @return DefaultListenerManager CommandQueue instance.
+     */
+	private CommandQueue createCommandQueue(ConfigTree config) 
+	{
+		String commandQueueClass = config.getAttribute("command-queue-class");
+		if(commandQueueClass != null) 
 		{
-			String commandQueueClass = config.getAttribute("command-queue-class");
-			if(commandQueueClass != null) 
+			try { return (CommandQueue) Class.forName(commandQueueClass).newInstance(); }
+			catch (Exception e) 
 			{
-				try { return (CommandQueue) Class.forName(commandQueueClass).newInstance(); }
-				catch (Exception e) 
-				{
-					_logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "].  Defaulting to no Command Queue", e);
-				}
+				_logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "].  Defaulting to no Command Queue", e);
 			}
-				
-			return null;
-		} //____________________________
-	
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#run()
-		 */
-		public void run() 
+		}
+			
+		return null;
+	} //____________________________
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#run()
+	 */
+	public void run() 
+	{
+		boolean relaunch = true;
+		while (endNotRequested()) 
 		{
-			boolean relaunch = true;
-			while (endNotRequested()) 
+			_status = State.Running;
+			if (relaunch)
 			{
-				_status = State.Running;
-				if (relaunch)
+				synchronized (_synchAllReady)
 				{
-					synchronized (_synchAllReady)
+					_childrenStarted = new HashMap<Class, Boolean>();
+					for (ConfigTree oCurr : _config.getAllChildren()) 
 					{
-						_childrenStarted = new HashMap<Class, Boolean>();
-						for (ConfigTree oCurr : _config.getAllChildren()) 
-						{
-							String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
-							if (Util.isNullString(sClass))
-								continue;
-							tryToLaunchChildListener(oCurr, sClass);
-						}
+						String sClass 	= oCurr.getAttribute(PARM_LISTENER_CLASS);
+						if (Util.isNullString(sClass))
+							sClass 		= oCurr.getAttribute(PARM_GATEWAY_CLASS);
+						if (Util.isNullString(sClass))
+							continue;
+						
+						tryToLaunchChildListener(oCurr, sClass);
 					}
-					waitUntilReady();
 				}
-				_status = State.Ready;
-	
-				waitForCmdOrSleep();
-	
-				if (endRequested())
+				waitUntilReady();
+			}
+			_status = State.Ready;
+
+			waitForCmdOrSleep();
+
+			if (endRequested())
+			{
+				break;
+			}
+
+			relaunch = false;
+			if (_parametersName != null && isReloadNeeded()) 
+				try 
 				{
-					break;
+					_status = State.Loading_parameters;
+					_logger.debug("Reloading parameters _____________________________________________________");
+					ConfigTree oNew = DefaultListenerManager.getListenerConfig(_parametersName);
+					checkParms(oNew);
+					_config = oNew;
+					relaunch = true;
+				} 
+				catch (Exception e) 
+				{
+					_logger.error("Failed to reload parameters"
+							+ " - Continuing with cached version", e);
 				}
+		}
+		// m_oState = State.Shutting_down;
 
-				relaunch = false;
-				if (_parametersName != null && isReloadNeeded()) 
-					try 
-					{
-						_status = State.Loading_parameters;
-						_logger.debug("Reloading parameters _____________________________________________________");
-						ConfigTree oNew = DefaultListenerManager.getListenerConfig(_parametersName);
-						checkParms(oNew);
-						_config = oNew;
-						relaunch = true;
-					} 
-					catch (Exception e) 
-					{
-						_logger.error("Failed to reload parameters"
-								+ " - Continuing with cached version", e);
-					}
-			}
-			// m_oState = State.Shutting_down;
-	
-			_status = State.Done_OK;
-			_status.setCompletionCode(0);
-			_logger.debug("Finishing_____________________________________________________");
-	
-			// Close the command queue...
-			try {
-				if (null != _commandQueue)
-					_commandQueue.close();
-			} catch (CommandQueueException e) {
-				_logger.error("Error closing Command Queue.", e);
-			}
-		} // ________________________________
-	
-		private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName) 
+		_status = State.Done_OK;
+		_status.setCompletionCode(0);
+		_logger.debug("Finishing_____________________________________________________");
+
+		// Close the command queue...
+		try {
+			if (null != _commandQueue)
+				_commandQueue.close();
+		} catch (CommandQueueException e) {
+			_logger.error("Error closing Command Queue.", e);
+		}
+	} // ________________________________
+
+	private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName) 
+	{
+		try 
 		{
+			Class oListener = Class.forName(p_sClassName);
+			Constructor oConst = oListener.getConstructor(new Class[] {ListenerManager.class, ConfigTree.class});
+			AbstractManagedListener listener = (AbstractManagedListener)
+					oConst.newInstance(new Object[] { this,p_oP});
+
+			_childrenStarted.put(listener.getClass(), Boolean.FALSE);
+			((Observable)listener).addObserver(this);
+			new Thread(listener).start();
+		}
+		catch (Throwable e) 
+		{
+			_logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+		}
+	} // ________________________________
+
+	public long millisToWait()
+	{
+		synchronized (_synchReload)
+		{
+			return Math.min(_nextReload, _endTimeStamp) - System.currentTimeMillis();
+		}
+	} // ________________________________
+
+	private void waitForCmdOrSleep() 
+	{
+		long lToGo = millisToWait();
+
+		if (null == _commandQueue) 
+		{
+			_logger.debug("About to sleep " + lToGo);
+			// No command queue nor topic - Just sleep until time
+			// exhausted, or thread interrupted
 			try 
 			{
-				Class oListener = Class.forName(p_sClassName);
-				Constructor oConst = oListener.getConstructor(new Class[] {ListenerManager.class, ConfigTree.class});
-				Runnable listener = (Runnable)oConst.newInstance(new Object[] { this,p_oP});
-
-				// if you wish to monitor that child listener is ready, make it extend Observer
-				// and update observers with a Boolean.TRUE when it's ready
-				if (listener instanceof Observable)
-				{
-					_childrenStarted.put(listener.getClass(), Boolean.FALSE);
-					((Observable)listener).addObserver(this);
-				}
-				
-				new Thread(listener).start();
-			}
-			catch (Throwable e) 
+				while ((lToGo=millisToWait()) > 0)
+					Thread.sleep(500);
+			} 
+			catch (InterruptedException e) 
 			{
-				_logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+				_endTimeStamp = 0; // mark as end requested and return
 			}
-		} // ________________________________
-	
-		public long millisToWait()
+			return;
+		}
+
+		// Wait for commands until time exhausted or command received
+		// Note that received commands might change time variables (reload/end)
+		// that's why time to go is recalculated on each cycle
+		while ((lToGo = millisToWait()) > 0) 
 		{
-			synchronized (_synchReload)
+			try 
 			{
-				return Math.min(_nextReload, _endTimeStamp) - System.currentTimeMillis();
-			}
-		} // ________________________________
-	
-		private void waitForCmdOrSleep() {
-			long lToGo = millisToWait();
-	
-			if (null == _commandQueue) {
-				_logger.debug("About to sleep " + lToGo);
-				// No command queue nor topic - Just sleep until time
-				// exhausted, or thread interrupted
-				try {
-					while ((lToGo=millisToWait()) > 0)
-						Thread.sleep(500);
-				} catch (InterruptedException e) {
-					_endTimeStamp = 0; // mark as end requested and return
+				_logger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+				String oM = _commandQueue.receiveCommand(lToGo);
+				if (null == oM) {
+					return;
 				}
-				return;
-			}
-	
-			// Wait for commands until time exhausted or command received
-			// Note that received commands might change time variables (reload/end)
-			// that's why time to go is recalculated on each cycle
-			while ((lToGo = millisToWait()) > 0) {
-				try {
-					_logger.info("Waiting for command ... timeout=" + lToGo + " millis");
-	
-					String oM = _commandQueue.receiveCommand(lToGo);
-					if (null == oM) {
-						return;
-					}
-					processCommand(oM);
-					if (endRequested() || isReloadNeeded()) {
-						break;
-					}
-				} catch (CommandQueueException eJ) {
-					_logger.info("receive on command queue failed", eJ);
+				processCommand(oM);
+				if (endRequested() || isReloadNeeded()) {
+					break;
 				}
+			} 
+			catch (CommandQueueException eJ) 
+			{
+				_logger.info("receive on command queue failed", eJ);
 			}
-		} // ________________________________
-	
-		/**
-		 * Processes the command that has been received in the command queue (or
-		 * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
-		 * changed
-		 * 
-		 * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
-		 * width="400"/> </COLGROUP>
-		 * <TR>
-		 * <TD align="center">message text</TD>
-		 * <TD align="center">effect</TD>
-		 * </TR>
-		 * <TR>
-		 * <TD>shutdown*</TD>
-		 * <TD>End time will be immediately set to 'now' - quiesce process will
-		 * start - Child threads will be allowed to finish normally</TD>
-		 * </TR>
-		 * <TR>
-		 * <TD>reload param*</TD>
-		 * <TD>Parameters will be immediately reloaded, and listener reconfigured
-		 * with new values</TD>
-		 * </TR>
-		 * <TR>
-		 * <TD>endTime yyyyMMdd hh:mm:ss</TD>
-		 * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
-		 * end of day assumed (23:59:59)</TD>
-		 * </TR>
-		 * </TABLE> * startsWith() <p/>
-		 * 
-		 * @param p_oMsg
-		 *            Message received from the command queue.
-		 * 
-		 */
-		private void processCommand(String sTxt) 
-		{
-			if (null == sTxt)
-				return;
-			
-			String sLow = sTxt.trim().toLowerCase();
-			if (sLow.startsWith("shutdown")) {
-				_endRequested = true;
-				_logger.info("Shutdown has been requested");
-				return;
-			}
-			if (sLow.startsWith("reload param")) {
-				_reloadRequested = true;
-				_logger
-						.info("Request for parameter reload has been received");
-				return;
-			}
-			String[] sa = sLow.split("\\s+");
-			if (sa.length > 1 && "endtime".equals(sa[0])) {
-				try {
-					String sDate = sa[1];
-					String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
-							: sa[2];
-					Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
-					_logger.info("New end date set to : " + oEnd);
-					_endTimeStamp = oEnd.getTime();
-				} catch (Exception eDat) {
-					_logger.info("Problems with endTime command", eDat);
-				}
-			}
-		} // ________________________________
-	
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#requestEnd()
-		 */
-		public void requestEnd() 
-		{
-			_endRequested=true;
-			_endTimeStamp = 0;
 		}
+	} // ________________________________
+
+	/**
+	 * Processes the command that has been received in the command queue (or
+	 * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
+	 * changed
+	 * 
+	 * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+	 * width="400"/> </COLGROUP>
+	 * <TR>
+	 * <TD align="center">message text</TD>
+	 * <TD align="center">effect</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>shutdown*</TD>
+	 * <TD>End time will be immediately set to 'now' - quiesce process will
+	 * start - Child threads will be allowed to finish normally</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>reload param*</TD>
+	 * <TD>Parameters will be immediately reloaded, and listener reconfigured
+	 * with new values</TD>
+	 * </TR>
+	 * <TR>
+	 * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+	 * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+	 * end of day assumed (23:59:59)</TD>
+	 * </TR>
+	 * </TABLE> * startsWith() <p/>
+	 * 
+	 * @param p_oMsg
+	 *            Message received from the command queue.
+	 * 
+	 */
+	private void processCommand(String sTxt) 
+	{
+		if (null == sTxt)
+			return;
 		
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endRequested()
-		 */
-		public boolean endRequested() 
-		{
-			return _endRequested || System.currentTimeMillis() >= _endTimeStamp;
+		String sLow = sTxt.trim().toLowerCase();
+		if (sLow.startsWith("shutdown")) {
+			_endRequested = true;
+			_logger.info("Shutdown has been requested");
+			return;
 		}
-	
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endNotRequested()
-		 */
-		public boolean endNotRequested() 
-		{
-			return !endRequested();
+		if (sLow.startsWith("reload param")) {
+			_reloadRequested = true;
+			_logger
+					.info("Request for parameter reload has been received");
+			return;
 		}
+		String[] sa = sLow.split("\\s+");
+		if (sa.length > 1 && "endtime".equals(sa[0])) {
+			try {
+				String sDate = sa[1];
+				String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+						: sa[2];
+				Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+				_logger.info("New end date set to : " + oEnd);
+				_endTimeStamp = oEnd.getTime();
+			} catch (Exception eDat) {
+				_logger.info("Problems with endTime command", eDat);
+			}
+		}
+	} // ________________________________
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#requestEnd()
+	 */
+	public void requestEnd() 
+	{
+		_endRequested=true;
+		_endTimeStamp = 0;
+	}
 	
-		public boolean isReloadNeeded()
-		{
-			// command to reload was received - force reload
-			if (_reloadRequested)
-				return refreshNextReload();
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endRequested()
+	 */
+	public boolean endRequested() 
+	{
+		return _endRequested || System.currentTimeMillis() >= _endTimeStamp;
+	}
 
-			// Still not time to reload
-			if (System.currentTimeMillis() < _nextReload)
-				return false;
-			
-			if (null==_parametersName)
-				return refreshNextReload();
-			
-			File paramFile = new File(_parametersName);
-			if (! paramFile.exists())
-				return refreshNextReload();
-			
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endNotRequested()
+	 */
+	public boolean endNotRequested() 
+	{
+		return !endRequested();
+	}
+
+	public boolean isReloadNeeded()
+	{
+		// command to reload was received - force reload
+		if (_reloadRequested)
+			return refreshNextReload();
+
+		// Still not time to reload
+		if (System.currentTimeMillis() < _nextReload)
+			return false;
+		
+		if (null==_parametersName)
+			return refreshNextReload();
+		
+		File paramFile = new File(_parametersName);
+		if (! paramFile.exists())
+			return refreshNextReload();
+		
 //			check the TS on the file.
-			Long previousTimeStamp	= _paramFileTimeStamps.get(_parametersName);
-			if (null==previousTimeStamp)
-				return refreshNextReload();
+		Long previousTimeStamp	= _paramFileTimeStamps.get(_parametersName);
+		if (null==previousTimeStamp)
+			return refreshNextReload();
 
 
-			Long currentTimeStamp	= paramFile.lastModified();
-			if (! previousTimeStamp.equals(currentTimeStamp))
-				return refreshNextReload();
+		Long currentTimeStamp	= paramFile.lastModified();
+		if (! previousTimeStamp.equals(currentTimeStamp))
+			return refreshNextReload();
 
-			setNextReloadTime(_config);
-			return false;
+		setNextReloadTime(_config);
+		return false;
+	}
+	
+	private boolean refreshNextReload()
+	{
+		setNextReloadTime(_config);
+		return true;
+	}
+
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#continueLooping()
+	 */
+	public boolean continueLooping() 
+	{
+		return (endNotRequested() && !isReloadNeeded());
+	} // ________________________________
+	
+	public static EPRManager getEprManager()
+	{
+		PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+		String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");	
+		return EPRManager.getInstance(sDir);
+	}
+	
+	@Deprecated
+	private void register (String name, EPR address)
+	{
+		try { getEprManager().saveEPR(name,address); }
+		catch (IOException e)
+		{
+			_logger.fatal("Cannot register service",e);
 		}
-		
-		private boolean refreshNextReload()
+	} // ________________________________
+	@Deprecated
+	private void unRegister (String name)
+	{
+		try { getEprManager().removeEPR(name); }
+		catch (IOException e)
 		{
-			setNextReloadTime(_config);
-			return true;
+			_logger.fatal("Cannot un-register service",e);
 		}
+	} // ________________________________
 
-	
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.EsbListenerController#continueLooping()
-		 */
-		public boolean continueLooping() 
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.ListenerManager#register(org.jboss.soa.esb.helpers.ConfigTree, org.jboss.soa.esb.addressing.EPR)
+	 */
+	public void register(ConfigTree config , EPR epr) throws RegistryException
+	{
+		String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+		String serviceName         = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+		if ("eprManager".equalsIgnoreCase(serviceCategoryName))
 		{
-			return (endNotRequested() && !isReloadNeeded());
-		} // ________________________________
-		
-		public static EPRManager getEprManager()
+				register(serviceName,epr);
+				return;
+		}
+		String serviceDescription  = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
+		String eprDescription      = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
+		Registry registry = RegistryFactory.getRegistry();
+		synchronized (_synchRegistry)
 		{
-			PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
-			String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");	
-			return EPRManager.getInstance(sDir);
+			registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
 		}
-		
-		@Deprecated
-		private void register (String name, EPR address)
+	} //____________________________
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.message.ListenerManager#unRegister(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
+	 */
+	public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
+	{
+		if ("eprManager".equalsIgnoreCase(serviceCategoryName))
 		{
-			try { getEprManager().saveEPR(name,address); }
-			catch (IOException e)
-			{
-				_logger.fatal("Cannot register service",e);
-			}
-		} // ________________________________
-		@Deprecated
-		private void unRegister (String name)
+				unRegister(serviceName);
+				return;
+		}
+		Registry registry = RegistryFactory.getRegistry();
+		synchronized (_synchRegistry)
 		{
-			try { getEprManager().removeEPR(name); }
-			catch (IOException e)
-			{
-				_logger.fatal("Cannot un-register service",e);
-			}
-		} // ________________________________
+			registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
+		}
+	} //____________________________
 
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.ListenerManager#register(org.jboss.soa.esb.helpers.ConfigTree, org.jboss.soa.esb.addressing.EPR)
-		 */
-		public void register(ConfigTree config , EPR epr) throws RegistryException
+	public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException 
+	{
+		return null;
+	} //____________________________
+
+	public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException 
+	{
+		return null;
+	} //____________________________
+
+	// this method will typically run in the invoker's thread 
+	// (which btw might be the same as the run() thread if this not launched in a separate thread
+	public void waitUntilReady() 
+	{
+		boolean someChildPending = true;
+		do
 		{
-			String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-			String serviceName         = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-			if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+			Collection<Boolean> allStarted = null;
+			synchronized (_synchAllReady)
 			{
-					register(serviceName,epr);
-					return;
+				allStarted = (null==_childrenStarted) ? null :  _childrenStarted.values(); 
 			}
-			String serviceDescription  = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
-			String eprDescription      = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
-			Registry registry = RegistryFactory.getRegistry();
-			synchronized (_registrySynch)
+			if (null!= allStarted)
 			{
-				registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
+				someChildPending = false;
+				for (Boolean curr : allStarted)
+					if (! Boolean.TRUE.equals(curr))
+						someChildPending = true;
 			}
-		} //____________________________
-		/* (non-Javadoc)
-		 * @see org.jboss.soa.esb.listeners.message.ListenerManager#unRegister(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
-		 */
-		public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
-		{
-			if ("eprManager".equalsIgnoreCase(serviceCategoryName))
-			{
-					unRegister(serviceName);
-					return;
-			}
-			Registry registry = RegistryFactory.getRegistry();
-			synchronized (_registrySynch)
-			{
-				registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
-			}
-		} //____________________________
+			if (someChildPending)
+				try { Thread.sleep(_pauseTimeMillis); }
+				catch (InterruptedException e) {	return; }
+				
+		} while (someChildPending);
+	} //____________________________
 	
-		public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException 
-		{
-			return null;
-		} //____________________________
+	// Child processes must let us know when they're ready
+	public void update(Observable o, Object arg) 
+	{
+		if (null!=_childrenStarted && (arg instanceof Boolean))
+			_childrenStarted.put(o.getClass(), (Boolean)arg);
+	} //____________________________
 
-		public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException 
-		{
-			return null;
-		} //____________________________
-	
-		// this method will typically run in the invoker's thread 
-		// (which btw might be the same as the run() thread if this not launched in a separate thread
-		public void waitUntilReady() 
-		{
-			boolean someChildPending = true;
-			do
-			{
-				Collection<Boolean> allStarted = null;
-				synchronized (_synchAllReady)
-				{
-					allStarted = (null==_childrenStarted) ? null :  _childrenStarted.values(); 
-				}
-				if (null!= allStarted)
-				{
-					someChildPending = false;
-					for (Boolean curr : allStarted)
-						if (! Boolean.TRUE.equals(curr))
-							someChildPending = true;
-				}
-				if (someChildPending)
-					try { Thread.sleep(_pauseTimeMillis); }
-					catch (InterruptedException e) {	return; }
-					
-			} while (someChildPending);
-		} //____________________________
-		
-		// Child processes must let us know when they're ready
-		public void update(Observable o, Object arg) 
-		{
-			if (null!=_childrenStarted && (arg instanceof Boolean))
-				_childrenStarted.put(o.getClass(), (Boolean)arg);
-		} //____________________________
 
+	private static	Logger 	_logger = Logger.getLogger(DefaultListenerManager.class);
+
+	private ConfigTree		_config;
+	private CommandQueue 	_commandQueue;
+
+	private State 			_status = State.Uninitialised;
+	public State getState() { return _status; }
 	
-		private static	Logger 	_logger = Logger.getLogger(DefaultListenerManager.class);
-		private ConfigTree		_config;
-		private CommandQueue 	_commandQueue;
+	private Map<Class,Boolean> _childrenStarted;
+	
+	
+	private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
+	private String 			_sRldSecs=null;
+	private String 			_parametersName;
 
-		private State 			_status = State.Uninitialised;
-		public State getState() { return _status; }
-		
-		private Map<Class,Boolean> _childrenStarted;
-		
-		
-		private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
-		private String 			_sRldSecs=null;
-		private String 			_parametersName;
+	private boolean 		_reloadRequested, _endRequested;	
+	private long 			_nextReload 	= Long.MAX_VALUE;
+	private long 			_endTimeStamp 	= Long.MAX_VALUE;
 
-		private boolean 		_reloadRequested, _endRequested;	
-		private long 			_nextReload 	= Long.MAX_VALUE;
-		private long 			_endTimeStamp 	= Long.MAX_VALUE;
+	private Object 			_synchRegistry 	= new Short((short)0);
+	private Object 			_synchReload 	= new Short((short)10);
+	private Object			_synchAllReady 	= new Short((short)20);
+
+	protected int 			_defaultReloadIntervalMillis;
 	
-		private Object 			_registrySynch 	= new Short((short)0);
-		private Object 			_synchReload 	= new Short((short)10);
-		private Object			_synchAllReady 	= new Short((short)20);
-		// default interval between parameter reloads = 6 minutes
-		protected int 			_defaultReloadIntervalMillis = 180000 ;
-		
-		protected int			_pauseTimeMillis;
+	protected int			_pauseTimeMillis;
 
-	} // ____________________________________________________________________________
+} // ____________________________________________________________________________

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java	2006-12-14 18:47:51 UTC (rev 8325)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java	2006-12-14 18:49:41 UTC (rev 8326)
@@ -0,0 +1,228 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * All ESB 'managed' listeners (gateways and message) should extend this class.
+ * 
+ * <p/> Responsibility as an Observable is to notify Observers when it
+ * is ready to run (Boolean.TRUE) and when it's not (Boolean.FALSE)
+ * 
+ * <br/> It must be a Runnable that will be started in a thread forked and controlled 
+ * by a ListenerManager (the _controller)
+ * 
+ * <br/> It is supposed to manage it's own thread pool of 'actions' (the _execService)
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ * 
+ */
+public abstract class AbstractManagedListener extends Observable implements Runnable, Observer
+{
+	public abstract boolean isMessageAware();
+	public abstract boolean initializeRun();
+	public abstract boolean finalizeRun();
+	public abstract void 	waitForEventAndProcess(long maxWaitMillis);
+
+	// disable default constructor
+    private AbstractManagedListener() {}
+    
+	/**
+	 * public constructor
+	 * @param controller ListenerManager - the controlling process
+	 * @param config ConfigTree - Containing 'static' configuration for this instance
+	 * @throws Exception
+	 */
+    protected AbstractManagedListener(ListenerManager controller, ConfigTree config)
+    	throws ConfigurationException
+    {
+        _pauseLapseInMillis = 50;
+        _defaultMaxThreads	= 1;
+		_controller 		= controller;
+		_config 			= config;    	
+
+		_logger	= Logger.getLogger(this.getClass());
+		checkMyParms();
+    } //________________________________
+    
+    
+    protected void checkMyParms() throws ConfigurationException
+    {
+    	_eprCategoryName= _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+        _eprName		= _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);        
+
+        String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
+        if (!Util.isNullString(attr))
+        	try { _maxThreads = Integer.parseInt(attr); }
+        	catch (Exception e) { attr = null; }
+        if (Util.isNullString(attr))
+        {
+        	_maxThreads = _defaultMaxThreads;
+        	_logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
+        			+" - Using default value of <"+_maxThreads+">");
+        }
+    } //________________________________
+    
+    protected void registerProcess()  throws RegistryException
+    {
+    	_bRegistered = false;
+    	_controller.register(_config,_epr);
+    	_bRegistered = true;
+    } // ________________________________
+    
+    protected void unregisterProcess() throws RegistryException
+    {
+	    if (_bRegistered)
+	    	_controller.unRegister(_eprCategoryName, _eprName, _epr);    	    	
+    } //________________________________
+    
+    public boolean ensureThreadAvailable()
+    {
+    	while (  (_qRunningThreads >= _maxThreads)
+    			&&_controller.continueLooping())
+    		try { Thread.sleep(_pauseLapseInMillis); }
+    		catch (InterruptedException e) { return false; }
+    	
+		return _controller.continueLooping();
+    } //________________________________
+
+    /**
+     * Wait until the registration process finished, a PickupCourier was obtained
+     * , and the pool thread is instantiated
+     */
+	public void waitUntilReady() 
+	{
+		while(null==_execService) 
+			try { Thread.sleep(_pauseLapseInMillis); }
+			catch (InterruptedException e) 
+				{ _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
+	} //________________________________
+
+    // Child threads will send a -1  when their run() method ends
+    // we need to prevent picking up Messages when there are no available threads in pool
+	public void update(Observable o, Object arg) 
+	{
+		if (arg instanceof Integer)
+			updateThreadCount((Integer)arg);
+	} //________________________________
+	
+	protected void resetThreadCount()
+	{
+		synchronized (_synchThreads) 
+			{ _qRunningThreads = 0;}
+	} //________________________________
+	
+	protected void updateThreadCount(Integer i)
+	{
+		synchronized (_synchThreads) 
+			{ _qRunningThreads += i.intValue();}
+        _logger.debug("Thread pool ("+getClass().getSimpleName()+") used ="+_qRunningThreads+"/"+_maxThreads);
+	} //________________________________
+	
+    protected String obtainAttribute(String p_sAtt, String p_sDefault)
+		throws ConfigurationException 
+	{
+		_logger.debug("Reading value for " + p_sAtt);
+		String sVal = _config.getAttribute(p_sAtt);
+		if ((null == sVal) && (null == p_sDefault))
+			throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+	
+		return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+	
+    /**
+     * Loops until controlling process determines
+     * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
+     * <br/>When one is received, instantiates an action processing pipeline to process it
+     */
+    public void run()
+    {
+        _logger.debug("run() method of "+this.getClass().getSimpleName()
+        		+" started on thread "+Thread.currentThread().getName());
+
+		if (initializeRun())
+		{
+			// Instantiate the pool thread and set active count to zero
+			_execService = Executors.newFixedThreadPool(_maxThreads);
+			resetThreadCount();
+			
+			setChanged();
+			notifyObservers(Boolean.TRUE);
+			
+	    	while (_controller.continueLooping())
+	        {
+	    		// Only pickup a message when a thread is available
+	    		if (! ensureThreadAvailable())
+	    			break;
+	    		
+	    		long lWait = _controller.millisToWait();
+
+	    		// This if() is just in case (it should never happen - it's a safety net)
+	    		if (lWait>0)
+	    			waitForEventAndProcess(lWait);	    		
+	        }
+	    	setChanged();
+	    	notifyObservers(Boolean.FALSE);	    	
+		}
+		
+		finalizeRun();
+
+		// shutdown should wait for all child threads to finish
+		if (null!=_execService)
+			_execService.shutdown();
+		
+        _logger.debug("run() method of "+this.getClass().getSimpleName()
+        		+" finished on thread "+Thread.currentThread().getName());
+    } // _______________________________
+    
+	
+    protected ConfigTree 		_config;
+    protected ListenerManager	_controller;
+
+    protected boolean			_bRegistered;
+	protected String            _eprCategoryName;
+    protected String			_eprName;
+    protected EPR				_epr;
+    
+    protected int				_maxThreads;
+    protected int				_defaultMaxThreads;
+    protected long				_pauseLapseInMillis;
+    protected ExecutorService	_execService;
+
+    private Object _synchThreads = new Short((short)-1);
+    private int					_qRunningThreads;
+
+    protected Logger			_logger;
+}

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-14 18:47:51 UTC (rev 8325)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-14 18:49:41 UTC (rev 8326)
@@ -23,21 +23,18 @@
 package org.jboss.soa.esb.listeners.message;
 
 import java.lang.reflect.Method;
-import java.util.Observable;
-import java.util.Observer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
-import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.AbstractManagedListener;
+import org.jboss.soa.esb.listeners.ListenerManager;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
 
 /**
@@ -50,220 +47,117 @@
  * @since Version 4.0
  */
 
-public class MessageAwareListener extends Observable implements Runnable, Observer
+public class MessageAwareListener extends AbstractManagedListener
 {
-	
 	/**
 	 * public constructor
-	 * @param controller EsbListenerController - the controlling process
+	 * @param controller ListenerManager - the controlling process
 	 * @param config ConfigTree - Containing 'static' configuration for this instance
 	 * @throws Exception
 	 */
-    public MessageAwareListener(EsbListenerController controller, ConfigTree config) 
+    public MessageAwareListener(ListenerManager controller, ConfigTree config) 
     	throws ConfigurationException 
     {
-        _millisToWaitForThread = 50;
-        _defaultMaxThreads	= 1;
-		_controller 	= controller;
-		_config 		= config;
-    	
-    	checkMyParms();
+    	super (controller, config);
     } // _______________________________
 
     /**
-     * Wait until the registration process finished, a PickupCourier was obtained
-     * , and the pool thread is instantiated
-     */
-	public void waitUntilReadyToPickup() 
-	{
-		while(null==_execService) 
-			try { Thread.sleep(50); }
-			catch (InterruptedException e) 
-				{ _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
-	} //________________________________
-
-    // Child threads will send a -1  when their run() method ends
-    // we need to prevent picking up Messages when there are no available threads in pool
-	public void update(Observable o, Object arg) 
-	{
-		if (arg instanceof Integer)
-		{
-			_qRunningThreads += ((Integer)arg).intValue();
-	        _logger.debug("Thread returned to pool("+_qRunningThreads+"/"+_maxThreads);
-		}
-	} //________________________________
-	
-    /**
-     * Loops until controlling process determines
-     * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
-     * <br/>When one is received, instantiates an action processing pipeline to process it
-     */
-    public void run()
-    {
-        _logger.debug("run() method of "+this.getClass().getSimpleName()
-        		+" started on thread "+Thread.currentThread().getName());
-
-        boolean bRegisterOK = false;
-        try   
-        {
-        	registerProcess();
-        	bRegisterOK = true;
-        }
-		catch (Exception re) 
-			{ _logger.fatal("Could not register service " + re.getLocalizedMessage(),re); }
-		
-		if (bRegisterOK)
-		{
-			// Instantiate the pool thread and set active count to zero
-			_execService = Executors.newFixedThreadPool(_maxThreads);
-			_qRunningThreads = 0;
-			
-			setChanged();
-			notifyObservers(Boolean.TRUE);
-			
-	    	while (_controller.continueLooping())
-	        {
-	    		// Check to see if there are available threads in pool
-	    		if (_qRunningThreads >= _maxThreads)
-	    			try 
-	    			{
-	    				Thread.sleep(_millisToWaitForThread);
-	    				continue;
-	    			}
-	    			catch (InterruptedException e) 
-	    			{
-	    				break; 
-	    			}
-
-	    		// Only pickup a message when a thread is available
-	    		long lWait = _controller.millisToWait();
-
-	    		// Just in case (this should never happen - it's a safety net)
-	    		if (lWait< 1)
-	    			continue;
-	    		
-	    		Message message = null;
-	    		try { message = (lWait > 0 ) ? _pickUpCourier.pickup(lWait) : null; }
-	    		catch (CourierTimeoutException e) { continue; }
-	    		catch (CourierException e) 
-	    		{
-	    			_logger.error(e);
-	    			continue;
-	    		}
-
-	        	if (null!=message)
-	        	{	
-	        		ActionProcessingPipeline chain = null;
-	
-	        		try	
-	        		{ 
-	        			chain = new ActionProcessingPipeline(message,_config);
-	        			chain.addObserver(this);
-	        			_qRunningThreads++;
-	        	        _logger.debug("Thread claimed from pool("+_qRunningThreads+"/"+_maxThreads);
-	        			_execService.execute(chain);
-	        		}
-	        		catch (Exception e)	{	_logger.error(e); 	continue; }
-	
-	        	}
-	        }
-	    	setChanged();
-	    	notifyObservers(Boolean.FALSE);	    	
-		}
-		
-    	try { unregisterProcess(); }
-	   	catch (Exception re) 
-	   	{
-	   		_logger.warn("Could not un register service " + re.getLocalizedMessage());
-	   	}
-		// shutdown should wait for all child threads to finish
-		if (null!=_execService)
-			_execService.shutdown();
-		
-        _logger.debug("run() method of "+this.getClass().getSimpleName()
-        		+" finished on thread "+Thread.currentThread().getName());
-    } // _______________________________
-    
-    /**
      * Check for mandatory and optional attributes in parameter tree
      * 
-     * @throws Exception -
+     * @throws ConfigurationException -
      *             if mandatory atts are not right or actionClass not in
      *             classpath
      */
     protected void checkMyParms() throws ConfigurationException 
     {
-        _eprCategoryName= obtainAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG,null);
-        _eprName		= obtainAttribute(ListenerTagNames.SERVICE_NAME_TAG,null);
-        
-        String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
-        if (!Util.isNullString(attr))
-        	try { _maxThreads = Integer.parseInt(attr); }
-        	catch (Exception e) { attr = null; }
-        if (Util.isNullString(attr))
-        {
-        	_maxThreads = _defaultMaxThreads;
-        	_logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
-        			+" - Using default value of <"+_maxThreads+">");
-        }
+    	super.checkMyParms();
 
+    	if (Util.isNullString(_eprCategoryName))
+    		throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+    	if (Util.isNullString(_eprName))
+    		throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_NAME_TAG);
     } // ________________________________
     
-    private void registerProcess()  throws Exception
-    {
-    	_bRegistered = false;
-    	_controller.register(_config,_epr);
-    	_bRegistered = true;
-    	_pickUpCourier = CourierFactory.getPickupCourier(_epr);
-    } // ________________________________
+	@Override
+	public boolean isMessageAware() { return true; }
+	
+	
+	@Override
+	public boolean finalizeRun() 
+	{
+    	try 
+    	{ 
+        	if (null!=_pickUpCourier)
+    	    	try
+    	    	{
+    	    		Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
+    	    		cleanMethod.invoke(_pickUpCourier, new Object[] {});
+    	    	}
+    	    	catch (NoSuchMethodException e) {/*  OK  Just don't invoke it  */  }
+    	    	catch (Exception e)
+    	    	{
+    	    		_logger.error("Problems invoking courier.clean() Method",e);
+    	    	}
 
-    private void unregisterProcess()  throws Exception
-    {
-    	if (null!=_pickUpCourier)
-	    	try
-	    	{
-	    		Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
-	    		cleanMethod.invoke(_pickUpCourier, new Object[] {});
-	    	}
-	    	catch (NoSuchMethodException e) {/*  OK  Just don't invoke it  */  }
-	    	catch (Exception e)
-	    	{
-	    		_logger.error("Problems invoking courier.clean() Method",e);
-	    	}
+    	    unregisterProcess();
+    		return true; 
+    	}
+	   	catch (RegistryException re) 
+	   	{
+	   		_logger.warn("Could not un register service " + re.getLocalizedMessage());
+	   		return false;
+	   	}
 
-	    if (_bRegistered)
-	    	_controller.unRegister(_eprCategoryName, _eprName, _epr);    	
+	} //________________________________
 
-    } // ________________________________
+	@Override
+	public boolean initializeRun() 
+	{
+        try   
+        {
+        	registerProcess();
+        	_pickUpCourier = CourierFactory.getPickupCourier(_epr);
+        	return true;
+        }
+		catch (Exception re) 
+		{ 
+			_logger.fatal("Could not register service " + re.getLocalizedMessage(),re);
+			return false;
+		}
+		
+	} //________________________________
 
-    protected String obtainAttribute(String p_sAtt, String p_sDefault)
-		throws ConfigurationException 
+	@Override
+	public void waitForEventAndProcess(long maxWaitMillis) 
 	{
-    	_logger.info("Reading value for " + p_sAtt);
-    	String sVal = _config.getAttribute(p_sAtt);
-    	if ((null == sVal) && (null == p_sDefault))
-    		throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
-	
-    	return (null != sVal) ? sVal : p_sDefault;
-	} // ________________________________
-    
+		Message message = null;
+		try { message = (maxWaitMillis > 0 ) ? _pickUpCourier.pickup(maxWaitMillis) : null; }
+		catch (CourierTimeoutException e) { return; }
+		catch (CourierException e) 
+		{
+			_logger.error(e);
+			return;
+		}
 
-	protected ConfigTree 			_config = null;
-	protected EsbListenerController _controller = null;
+    	if (null!=message)
+    	{	
+    		ActionProcessingPipeline chain = null;
 
-	protected String            _eprCategoryName;
-    protected String			_eprName;
-    protected EPR				_epr;
-    
-    protected int				_maxThreads;
-    protected int				_qRunningThreads;
-    protected int				_defaultMaxThreads;
-    protected long				_millisToWaitForThread;
-    protected ExecutorService	_execService;
-    protected boolean			_bRegistered;
-    
-    protected PickUpOnlyCourier _pickUpCourier;
+    		try	
+    		{ 
+    			chain = new ActionProcessingPipeline(message,_config);
+    			chain.addObserver(this);
+    			updateThreadCount(+1);
+    			_execService.execute(chain);
+    		}
+    		catch (Exception e)	
+    		{	
+    			_logger.error(e); 	
+    			return; 
+    		}
+    	}
+		
+	} //________________________________
 
-    protected static transient Logger _logger = Logger.getLogger(MessageAwareListener.class);
-
+    protected PickUpOnlyCourier _pickUpCourier;
 } 




More information about the jboss-svn-commits mailing list