[jboss-svn-commits] JBL Code SVN: r8326 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss: internal/soa/esb/listeners soa/esb/listeners soa/esb/listeners/message
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Dec 14 13:49:48 EST 2006
Author: estebanschifman
Date: 2006-12-14 13:49:41 -0500 (Thu, 14 Dec 2006)
New Revision: 8326
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Work on new managed listeners and Listenermanager
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java 2006-12-14 18:47:51 UTC (rev 8325)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java 2006-12-14 18:49:41 UTC (rev 8326)
@@ -35,11 +35,13 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.command.CommandQueue;
import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.util.EPRManager;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.AbstractManagedListener;
import org.jboss.soa.esb.listeners.ListenerManager;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.State;
@@ -81,556 +83,583 @@
public class DefaultListenerManager implements ListenerManager
{
- /**
- * Construct a Listener Manager and configure it using the named repository entry
- *
- * @param parametersName
- * Name of the Repository entry containing the configuration.
- * @throws Exception
- * Unable to load/use the named configuration.
- */
- public DefaultListenerManager(String parametersName) throws Exception
- {
- this(DefaultListenerManager.getListenerConfig(parametersName));
- _parametersName = parametersName;
- } //____________________________
-
- /**
- * Construct a Listener Controller using the specified listener configuration.
- *
- * @param config
- * The configuration.
- * @throws Exception
- * Unable to load/use the supplied configuration.
- */
- public DefaultListenerManager(ConfigTree config) throws Exception
+ /**
+ * Construct a Listener Manager and configure it using the named repository entry
+ *
+ * @param parametersName
+ * Name of the Repository entry containing the configuration.
+ * @throws Exception
+ * Unable to load/use the named configuration.
+ */
+ public DefaultListenerManager(String parametersName) throws Exception
+ {
+ this(DefaultListenerManager.getListenerConfig(parametersName));
+ _parametersName = parametersName;
+ } //____________________________
+
+ /**
+ * Construct a Listener Controller using the specified listener configuration.
+ *
+ * @param config
+ * The configuration.
+ * @throws Exception
+ * Unable to load/use the supplied configuration.
+ */
+ public DefaultListenerManager(ConfigTree config) throws Exception
+ {
+ // default sleep time when waiting for events to happen in other threads
+ _pauseTimeMillis = 50;
+ // default interval between parameter reloads = 6 minutes
+ _defaultReloadIntervalMillis = 180000;
+
+ _config = config;
+ _status = State.Loading_parameters;
+
+ try { checkParms(_config); }
+ catch (Exception e)
{
- _pauseTimeMillis = 50;
- _config = config;
- _status = State.Loading_parameters;
-
- try { checkParms(_config); }
- catch (Exception e)
- {
- String configSource = config.getAttribute("configSource");
-
- _status = State.Exception_thrown;
- _status.setThrowable(e);
- _logger.fatal("Listener configuration and startup error. Config Source: "
- + (configSource != null ? configSource
- : "unknown"), e);
-
- throw e;
- }
- } //____________________________
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#getState()
- */
- /**
- * Load the named listener configuration from the configured parameter
- * repository.
- *
- * @param reposParam
- * The name of the repository entry containing the Listener
- * configuration.
- * @return Listener Configuration as {@link ConfigTree}.
- * @throws IOException
- * Unable to access the repository.
- * @throws ParamRepositoryException
- * Unable to access the configuration in the repository.
- * @throws SAXException
- * Unable to parse the configuration.
- */
- private static ConfigTree getListenerConfig(String reposParam)
- throws IOException, ParamRepositoryException, SAXException {
- String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
- ConfigTree config = ConfigTree.fromXml(sXml);
-
- config.setAttribute("configSource", "param-repository:" + reposParam);
-
- return config;
+ String configSource = config.getAttribute("configSource");
+
+ _status = State.Exception_thrown;
+ _status.setThrowable(e);
+ _logger.fatal("Listener configuration and startup error. Config Source: "
+ + (configSource != null ? configSource
+ : "unknown"), e);
+
+ throw e;
}
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#checkParms(org.jboss.soa.esb.helpers.ConfigTree)
- */
- public void checkParms(ConfigTree p_oP) throws Exception
+ } //____________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#getState()
+ */
+ /**
+ * Load the named listener configuration from the configured parameter
+ * repository.
+ *
+ * @param reposParam
+ * The name of the repository entry containing the Listener
+ * configuration.
+ * @return Listener Configuration as {@link ConfigTree}.
+ * @throws IOException
+ * Unable to access the repository.
+ * @throws ParamRepositoryException
+ * Unable to access the configuration in the repository.
+ * @throws SAXException
+ * Unable to parse the configuration.
+ */
+ private static ConfigTree getListenerConfig(String reposParam)
+ throws IOException, ParamRepositoryException, SAXException
+ {
+ String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
+ ConfigTree config = ConfigTree.fromXml(sXml);
+
+ config.setAttribute("configSource", "param-repository:" + reposParam);
+
+ return config;
+ } //____________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#checkParms(org.jboss.soa.esb.helpers.ConfigTree)
+ */
+ public void checkParms(ConfigTree p_oP) throws Exception
+ {
+ // We've just loaded - set to false until next reload requested
+ _reloadRequested = false;
+ if (null!=_parametersName)
{
- // We've just loaded - set to false until next reload requested
- _reloadRequested = false;
- if (null!=_parametersName)
- {
- File file = new File(_parametersName);
- if (file.exists())
- _paramFileTimeStamps.put(_parametersName, file.lastModified());
- }
-
+ File file = new File(_parametersName);
+ if (file.exists())
+ _paramFileTimeStamps.put(_parametersName, file.lastModified());
+ }
+
- _commandQueue = createCommandQueue(p_oP);
-
- // Open the command queue...
- if (null!=_commandQueue)
- _commandQueue.open(p_oP);
-
- setNextReloadTime(p_oP);
+ _commandQueue = createCommandQueue(p_oP);
- // if PARM_END_TIME not set try to run forever
- // not a good practice if command queue is not set
- // Expected date format is "yyyyMMdd hh:mm:ss"
- String sEndT = p_oP.getAttribute(PARM_END_TIME);
- _endTimeStamp = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
- sEndT).getTime();
-
- } // ________________________________
-
- private void setNextReloadTime(ConfigTree tree)
+ // Open the command queue...
+ if (null!=_commandQueue)
+ _commandQueue.open(p_oP);
+
+ setNextReloadTime(p_oP);
+
+ // if PARM_END_TIME not set try to run forever
+ // not a good practice if command queue is not set
+ // Expected date format is "yyyyMMdd hh:mm:ss"
+ String sEndT = p_oP.getAttribute(PARM_END_TIME);
+ _endTimeStamp = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
+ sEndT).getTime();
+
+ for (ConfigTree tree : _config.getAllChildren())
{
- // if RELOAD_SECONDS_TAG not set, and no command queue
- // then reload every 10 minutes
- // If there is a command queue, run until command is received
+ String gateway = tree.getAttribute(PARM_GATEWAY_CLASS);
+ String listener = tree.getAttribute(PARM_LISTENER_CLASS);
+ if (null==gateway || null==listener)
+ continue;
+ StringBuilder sb = new StringBuilder()
+ .append("Either "+PARM_GATEWAY_CLASS+" or "+PARM_LISTENER_CLASS)
+ .append(" can be specified, but not both")
+ ;
+ _logger.error("Child listener class must be gateway or listener - can't be both");
+ throw new ConfigurationException(sb.toString());
+ }
+
+ } // ________________________________
- _sRldSecs = tree.getAttribute(PARM_RELOAD_SECS);
- synchronized (_synchReload)
- {
- _nextReload = (null != _sRldSecs)
- ? System.currentTimeMillis() + 1000 * Long.parseLong(_sRldSecs)
- : (null == _commandQueue)
- ? Long.MAX_VALUE
- : System.currentTimeMillis() + _defaultReloadIntervalMillis;
- }
-
-
- if (null==_sRldSecs)
- {
- String sMsg = (null==_commandQueue)
- ? " - Using default of "+_sRldSecs
- : " - Listener will run until stopped by command sent to queue";
- _logger.warn("No value specified for: "+PARM_RELOAD_SECS + sMsg);
- }
+ private void setNextReloadTime(ConfigTree tree)
+ {
+ // if RELOAD_SECONDS_TAG not set, and no command queue
+ // then reload every 10 minutes
+ // If there is a command queue, run until command is received
- } //____________________________
+ _sRldSecs = tree.getAttribute(PARM_RELOAD_SECS);
+ synchronized (_synchReload)
+ {
+ _nextReload = (null != _sRldSecs)
+ ? System.currentTimeMillis() + 1000 * Long.parseLong(_sRldSecs)
+ : (null == _commandQueue)
+ ? Long.MAX_VALUE
+ : System.currentTimeMillis() + _defaultReloadIntervalMillis;
+ }
+
+
+ if (null==_sRldSecs)
+ {
+ String sMsg = (null==_commandQueue)
+ ? " - Using default of "+_sRldSecs
+ : " - Listener will run until stopped by command sent to queue";
+ _logger.warn("No value specified for: "+PARM_RELOAD_SECS + sMsg);
+ }
- /**
- * Factory method for creating the command queue.
- * @param config DefaultListenerManager config.
- * @return DefaultListenerManager CommandQueue instance.
- */
- private CommandQueue createCommandQueue(ConfigTree config)
+ } //____________________________
+
+ /**
+ * Factory method for creating the command queue.
+ * @param config DefaultListenerManager config.
+ * @return DefaultListenerManager CommandQueue instance.
+ */
+ private CommandQueue createCommandQueue(ConfigTree config)
+ {
+ String commandQueueClass = config.getAttribute("command-queue-class");
+ if(commandQueueClass != null)
{
- String commandQueueClass = config.getAttribute("command-queue-class");
- if(commandQueueClass != null)
+ try { return (CommandQueue) Class.forName(commandQueueClass).newInstance(); }
+ catch (Exception e)
{
- try { return (CommandQueue) Class.forName(commandQueueClass).newInstance(); }
- catch (Exception e)
- {
- _logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
- }
+ _logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
}
-
- return null;
- } //____________________________
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#run()
- */
- public void run()
+ }
+
+ return null;
+ } //____________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#run()
+ */
+ public void run()
+ {
+ boolean relaunch = true;
+ while (endNotRequested())
{
- boolean relaunch = true;
- while (endNotRequested())
+ _status = State.Running;
+ if (relaunch)
{
- _status = State.Running;
- if (relaunch)
+ synchronized (_synchAllReady)
{
- synchronized (_synchAllReady)
+ _childrenStarted = new HashMap<Class, Boolean>();
+ for (ConfigTree oCurr : _config.getAllChildren())
{
- _childrenStarted = new HashMap<Class, Boolean>();
- for (ConfigTree oCurr : _config.getAllChildren())
- {
- String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
- if (Util.isNullString(sClass))
- continue;
- tryToLaunchChildListener(oCurr, sClass);
- }
+ String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
+ if (Util.isNullString(sClass))
+ sClass = oCurr.getAttribute(PARM_GATEWAY_CLASS);
+ if (Util.isNullString(sClass))
+ continue;
+
+ tryToLaunchChildListener(oCurr, sClass);
}
- waitUntilReady();
}
- _status = State.Ready;
-
- waitForCmdOrSleep();
-
- if (endRequested())
+ waitUntilReady();
+ }
+ _status = State.Ready;
+
+ waitForCmdOrSleep();
+
+ if (endRequested())
+ {
+ break;
+ }
+
+ relaunch = false;
+ if (_parametersName != null && isReloadNeeded())
+ try
{
- break;
+ _status = State.Loading_parameters;
+ _logger.debug("Reloading parameters _____________________________________________________");
+ ConfigTree oNew = DefaultListenerManager.getListenerConfig(_parametersName);
+ checkParms(oNew);
+ _config = oNew;
+ relaunch = true;
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to reload parameters"
+ + " - Continuing with cached version", e);
}
+ }
+ // m_oState = State.Shutting_down;
- relaunch = false;
- if (_parametersName != null && isReloadNeeded())
- try
- {
- _status = State.Loading_parameters;
- _logger.debug("Reloading parameters _____________________________________________________");
- ConfigTree oNew = DefaultListenerManager.getListenerConfig(_parametersName);
- checkParms(oNew);
- _config = oNew;
- relaunch = true;
- }
- catch (Exception e)
- {
- _logger.error("Failed to reload parameters"
- + " - Continuing with cached version", e);
- }
- }
- // m_oState = State.Shutting_down;
-
- _status = State.Done_OK;
- _status.setCompletionCode(0);
- _logger.debug("Finishing_____________________________________________________");
-
- // Close the command queue...
- try {
- if (null != _commandQueue)
- _commandQueue.close();
- } catch (CommandQueueException e) {
- _logger.error("Error closing Command Queue.", e);
- }
- } // ________________________________
-
- private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName)
+ _status = State.Done_OK;
+ _status.setCompletionCode(0);
+ _logger.debug("Finishing_____________________________________________________");
+
+ // Close the command queue...
+ try {
+ if (null != _commandQueue)
+ _commandQueue.close();
+ } catch (CommandQueueException e) {
+ _logger.error("Error closing Command Queue.", e);
+ }
+ } // ________________________________
+
+ private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName)
+ {
+ try
{
+ Class oListener = Class.forName(p_sClassName);
+ Constructor oConst = oListener.getConstructor(new Class[] {ListenerManager.class, ConfigTree.class});
+ AbstractManagedListener listener = (AbstractManagedListener)
+ oConst.newInstance(new Object[] { this,p_oP});
+
+ _childrenStarted.put(listener.getClass(), Boolean.FALSE);
+ ((Observable)listener).addObserver(this);
+ new Thread(listener).start();
+ }
+ catch (Throwable e)
+ {
+ _logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ }
+ } // ________________________________
+
+ public long millisToWait()
+ {
+ synchronized (_synchReload)
+ {
+ return Math.min(_nextReload, _endTimeStamp) - System.currentTimeMillis();
+ }
+ } // ________________________________
+
+ private void waitForCmdOrSleep()
+ {
+ long lToGo = millisToWait();
+
+ if (null == _commandQueue)
+ {
+ _logger.debug("About to sleep " + lToGo);
+ // No command queue nor topic - Just sleep until time
+ // exhausted, or thread interrupted
try
{
- Class oListener = Class.forName(p_sClassName);
- Constructor oConst = oListener.getConstructor(new Class[] {ListenerManager.class, ConfigTree.class});
- Runnable listener = (Runnable)oConst.newInstance(new Object[] { this,p_oP});
-
- // if you wish to monitor that child listener is ready, make it extend Observer
- // and update observers with a Boolean.TRUE when it's ready
- if (listener instanceof Observable)
- {
- _childrenStarted.put(listener.getClass(), Boolean.FALSE);
- ((Observable)listener).addObserver(this);
- }
-
- new Thread(listener).start();
- }
- catch (Throwable e)
+ while ((lToGo=millisToWait()) > 0)
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
{
- _logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ _endTimeStamp = 0; // mark as end requested and return
}
- } // ________________________________
-
- public long millisToWait()
+ return;
+ }
+
+ // Wait for commands until time exhausted or command received
+ // Note that received commands might change time variables (reload/end)
+ // that's why time to go is recalculated on each cycle
+ while ((lToGo = millisToWait()) > 0)
{
- synchronized (_synchReload)
+ try
{
- return Math.min(_nextReload, _endTimeStamp) - System.currentTimeMillis();
- }
- } // ________________________________
-
- private void waitForCmdOrSleep() {
- long lToGo = millisToWait();
-
- if (null == _commandQueue) {
- _logger.debug("About to sleep " + lToGo);
- // No command queue nor topic - Just sleep until time
- // exhausted, or thread interrupted
- try {
- while ((lToGo=millisToWait()) > 0)
- Thread.sleep(500);
- } catch (InterruptedException e) {
- _endTimeStamp = 0; // mark as end requested and return
+ _logger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+ String oM = _commandQueue.receiveCommand(lToGo);
+ if (null == oM) {
+ return;
}
- return;
- }
-
- // Wait for commands until time exhausted or command received
- // Note that received commands might change time variables (reload/end)
- // that's why time to go is recalculated on each cycle
- while ((lToGo = millisToWait()) > 0) {
- try {
- _logger.info("Waiting for command ... timeout=" + lToGo + " millis");
-
- String oM = _commandQueue.receiveCommand(lToGo);
- if (null == oM) {
- return;
- }
- processCommand(oM);
- if (endRequested() || isReloadNeeded()) {
- break;
- }
- } catch (CommandQueueException eJ) {
- _logger.info("receive on command queue failed", eJ);
+ processCommand(oM);
+ if (endRequested() || isReloadNeeded()) {
+ break;
}
+ }
+ catch (CommandQueueException eJ)
+ {
+ _logger.info("receive on command queue failed", eJ);
}
- } // ________________________________
-
- /**
- * Processes the command that has been received in the command queue (or
- * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
- * changed
- *
- * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
- * width="400"/> </COLGROUP>
- * <TR>
- * <TD align="center">message text</TD>
- * <TD align="center">effect</TD>
- * </TR>
- * <TR>
- * <TD>shutdown*</TD>
- * <TD>End time will be immediately set to 'now' - quiesce process will
- * start - Child threads will be allowed to finish normally</TD>
- * </TR>
- * <TR>
- * <TD>reload param*</TD>
- * <TD>Parameters will be immediately reloaded, and listener reconfigured
- * with new values</TD>
- * </TR>
- * <TR>
- * <TD>endTime yyyyMMdd hh:mm:ss</TD>
- * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
- * end of day assumed (23:59:59)</TD>
- * </TR>
- * </TABLE> * startsWith() <p/>
- *
- * @param p_oMsg
- * Message received from the command queue.
- *
- */
- private void processCommand(String sTxt)
- {
- if (null == sTxt)
- return;
-
- String sLow = sTxt.trim().toLowerCase();
- if (sLow.startsWith("shutdown")) {
- _endRequested = true;
- _logger.info("Shutdown has been requested");
- return;
- }
- if (sLow.startsWith("reload param")) {
- _reloadRequested = true;
- _logger
- .info("Request for parameter reload has been received");
- return;
- }
- String[] sa = sLow.split("\\s+");
- if (sa.length > 1 && "endtime".equals(sa[0])) {
- try {
- String sDate = sa[1];
- String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
- : sa[2];
- Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
- _logger.info("New end date set to : " + oEnd);
- _endTimeStamp = oEnd.getTime();
- } catch (Exception eDat) {
- _logger.info("Problems with endTime command", eDat);
- }
- }
- } // ________________________________
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#requestEnd()
- */
- public void requestEnd()
- {
- _endRequested=true;
- _endTimeStamp = 0;
}
+ } // ________________________________
+
+ /**
+ * Processes the command that has been received in the command queue (or
+ * topic) <p/>m_bEndRequested, m_bReloadRequested, and m_lEndTime could be
+ * changed
+ *
+ * <p/> <p/><TABLE border="1"> <COLGROUP> <COL width="200"/> <COL
+ * width="400"/> </COLGROUP>
+ * <TR>
+ * <TD align="center">message text</TD>
+ * <TD align="center">effect</TD>
+ * </TR>
+ * <TR>
+ * <TD>shutdown*</TD>
+ * <TD>End time will be immediately set to 'now' - quiesce process will
+ * start - Child threads will be allowed to finish normally</TD>
+ * </TR>
+ * <TR>
+ * <TD>reload param*</TD>
+ * <TD>Parameters will be immediately reloaded, and listener reconfigured
+ * with new values</TD>
+ * </TR>
+ * <TR>
+ * <TD>endTime yyyyMMdd hh:mm:ss</TD>
+ * <TD>End time will be set to new value. If hh:mm:ss is not supplied =>
+ * end of day assumed (23:59:59)</TD>
+ * </TR>
+ * </TABLE> * startsWith() <p/>
+ *
+ * @param p_oMsg
+ * Message received from the command queue.
+ *
+ */
+ private void processCommand(String sTxt)
+ {
+ if (null == sTxt)
+ return;
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endRequested()
- */
- public boolean endRequested()
- {
- return _endRequested || System.currentTimeMillis() >= _endTimeStamp;
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown")) {
+ _endRequested = true;
+ _logger.info("Shutdown has been requested");
+ return;
}
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endNotRequested()
- */
- public boolean endNotRequested()
- {
- return !endRequested();
+ if (sLow.startsWith("reload param")) {
+ _reloadRequested = true;
+ _logger
+ .info("Request for parameter reload has been received");
+ return;
}
+ String[] sa = sLow.split("\\s+");
+ if (sa.length > 1 && "endtime".equals(sa[0])) {
+ try {
+ String sDate = sa[1];
+ String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+ : sa[2];
+ Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+ _logger.info("New end date set to : " + oEnd);
+ _endTimeStamp = oEnd.getTime();
+ } catch (Exception eDat) {
+ _logger.info("Problems with endTime command", eDat);
+ }
+ }
+ } // ________________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#requestEnd()
+ */
+ public void requestEnd()
+ {
+ _endRequested=true;
+ _endTimeStamp = 0;
+ }
- public boolean isReloadNeeded()
- {
- // command to reload was received - force reload
- if (_reloadRequested)
- return refreshNextReload();
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endRequested()
+ */
+ public boolean endRequested()
+ {
+ return _endRequested || System.currentTimeMillis() >= _endTimeStamp;
+ }
- // Still not time to reload
- if (System.currentTimeMillis() < _nextReload)
- return false;
-
- if (null==_parametersName)
- return refreshNextReload();
-
- File paramFile = new File(_parametersName);
- if (! paramFile.exists())
- return refreshNextReload();
-
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#endNotRequested()
+ */
+ public boolean endNotRequested()
+ {
+ return !endRequested();
+ }
+
+ public boolean isReloadNeeded()
+ {
+ // command to reload was received - force reload
+ if (_reloadRequested)
+ return refreshNextReload();
+
+ // Still not time to reload
+ if (System.currentTimeMillis() < _nextReload)
+ return false;
+
+ if (null==_parametersName)
+ return refreshNextReload();
+
+ File paramFile = new File(_parametersName);
+ if (! paramFile.exists())
+ return refreshNextReload();
+
// check the TS on the file.
- Long previousTimeStamp = _paramFileTimeStamps.get(_parametersName);
- if (null==previousTimeStamp)
- return refreshNextReload();
+ Long previousTimeStamp = _paramFileTimeStamps.get(_parametersName);
+ if (null==previousTimeStamp)
+ return refreshNextReload();
- Long currentTimeStamp = paramFile.lastModified();
- if (! previousTimeStamp.equals(currentTimeStamp))
- return refreshNextReload();
+ Long currentTimeStamp = paramFile.lastModified();
+ if (! previousTimeStamp.equals(currentTimeStamp))
+ return refreshNextReload();
- setNextReloadTime(_config);
- return false;
+ setNextReloadTime(_config);
+ return false;
+ }
+
+ private boolean refreshNextReload()
+ {
+ setNextReloadTime(_config);
+ return true;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.EsbListenerController#continueLooping()
+ */
+ public boolean continueLooping()
+ {
+ return (endNotRequested() && !isReloadNeeded());
+ } // ________________________________
+
+ public static EPRManager getEprManager()
+ {
+ PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+ String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");
+ return EPRManager.getInstance(sDir);
+ }
+
+ @Deprecated
+ private void register (String name, EPR address)
+ {
+ try { getEprManager().saveEPR(name,address); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot register service",e);
}
-
- private boolean refreshNextReload()
+ } // ________________________________
+ @Deprecated
+ private void unRegister (String name)
+ {
+ try { getEprManager().removeEPR(name); }
+ catch (IOException e)
{
- setNextReloadTime(_config);
- return true;
+ _logger.fatal("Cannot un-register service",e);
}
+ } // ________________________________
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.EsbListenerController#continueLooping()
- */
- public boolean continueLooping()
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.ListenerManager#register(org.jboss.soa.esb.helpers.ConfigTree, org.jboss.soa.esb.addressing.EPR)
+ */
+ public void register(ConfigTree config , EPR epr) throws RegistryException
+ {
+ String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ String serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ if ("eprManager".equalsIgnoreCase(serviceCategoryName))
{
- return (endNotRequested() && !isReloadNeeded());
- } // ________________________________
-
- public static EPRManager getEprManager()
+ register(serviceName,epr);
+ return;
+ }
+ String serviceDescription = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
+ String eprDescription = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
+ Registry registry = RegistryFactory.getRegistry();
+ synchronized (_synchRegistry)
{
- PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
- String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");
- return EPRManager.getInstance(sDir);
+ registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
}
-
- @Deprecated
- private void register (String name, EPR address)
+ } //____________________________
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.message.ListenerManager#unRegister(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
+ */
+ public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
+ {
+ if ("eprManager".equalsIgnoreCase(serviceCategoryName))
{
- try { getEprManager().saveEPR(name,address); }
- catch (IOException e)
- {
- _logger.fatal("Cannot register service",e);
- }
- } // ________________________________
- @Deprecated
- private void unRegister (String name)
+ unRegister(serviceName);
+ return;
+ }
+ Registry registry = RegistryFactory.getRegistry();
+ synchronized (_synchRegistry)
{
- try { getEprManager().removeEPR(name); }
- catch (IOException e)
- {
- _logger.fatal("Cannot un-register service",e);
- }
- } // ________________________________
+ registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
+ }
+ } //____________________________
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.ListenerManager#register(org.jboss.soa.esb.helpers.ConfigTree, org.jboss.soa.esb.addressing.EPR)
- */
- public void register(ConfigTree config , EPR epr) throws RegistryException
+ public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException
+ {
+ return null;
+ } //____________________________
+
+ public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException
+ {
+ return null;
+ } //____________________________
+
+ // this method will typically run in the invoker's thread
+ // (which btw might be the same as the run() thread if this not launched in a separate thread
+ public void waitUntilReady()
+ {
+ boolean someChildPending = true;
+ do
{
- String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- String serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+ Collection<Boolean> allStarted = null;
+ synchronized (_synchAllReady)
{
- register(serviceName,epr);
- return;
+ allStarted = (null==_childrenStarted) ? null : _childrenStarted.values();
}
- String serviceDescription = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
- String eprDescription = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
- Registry registry = RegistryFactory.getRegistry();
- synchronized (_registrySynch)
+ if (null!= allStarted)
{
- registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
+ someChildPending = false;
+ for (Boolean curr : allStarted)
+ if (! Boolean.TRUE.equals(curr))
+ someChildPending = true;
}
- } //____________________________
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.message.ListenerManager#unRegister(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
- */
- public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
- {
- if ("eprManager".equalsIgnoreCase(serviceCategoryName))
- {
- unRegister(serviceName);
- return;
- }
- Registry registry = RegistryFactory.getRegistry();
- synchronized (_registrySynch)
- {
- registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
- }
- } //____________________________
+ if (someChildPending)
+ try { Thread.sleep(_pauseTimeMillis); }
+ catch (InterruptedException e) { return; }
+
+ } while (someChildPending);
+ } //____________________________
- public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException
- {
- return null;
- } //____________________________
+ // Child processes must let us know when they're ready
+ public void update(Observable o, Object arg)
+ {
+ if (null!=_childrenStarted && (arg instanceof Boolean))
+ _childrenStarted.put(o.getClass(), (Boolean)arg);
+ } //____________________________
- public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException
- {
- return null;
- } //____________________________
-
- // this method will typically run in the invoker's thread
- // (which btw might be the same as the run() thread if this not launched in a separate thread
- public void waitUntilReady()
- {
- boolean someChildPending = true;
- do
- {
- Collection<Boolean> allStarted = null;
- synchronized (_synchAllReady)
- {
- allStarted = (null==_childrenStarted) ? null : _childrenStarted.values();
- }
- if (null!= allStarted)
- {
- someChildPending = false;
- for (Boolean curr : allStarted)
- if (! Boolean.TRUE.equals(curr))
- someChildPending = true;
- }
- if (someChildPending)
- try { Thread.sleep(_pauseTimeMillis); }
- catch (InterruptedException e) { return; }
-
- } while (someChildPending);
- } //____________________________
-
- // Child processes must let us know when they're ready
- public void update(Observable o, Object arg)
- {
- if (null!=_childrenStarted && (arg instanceof Boolean))
- _childrenStarted.put(o.getClass(), (Boolean)arg);
- } //____________________________
+ private static Logger _logger = Logger.getLogger(DefaultListenerManager.class);
+
+ private ConfigTree _config;
+ private CommandQueue _commandQueue;
+
+ private State _status = State.Uninitialised;
+ public State getState() { return _status; }
- private static Logger _logger = Logger.getLogger(DefaultListenerManager.class);
- private ConfigTree _config;
- private CommandQueue _commandQueue;
+ private Map<Class,Boolean> _childrenStarted;
+
+
+ private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
+ private String _sRldSecs=null;
+ private String _parametersName;
- private State _status = State.Uninitialised;
- public State getState() { return _status; }
-
- private Map<Class,Boolean> _childrenStarted;
-
-
- private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
- private String _sRldSecs=null;
- private String _parametersName;
+ private boolean _reloadRequested, _endRequested;
+ private long _nextReload = Long.MAX_VALUE;
+ private long _endTimeStamp = Long.MAX_VALUE;
- private boolean _reloadRequested, _endRequested;
- private long _nextReload = Long.MAX_VALUE;
- private long _endTimeStamp = Long.MAX_VALUE;
+ private Object _synchRegistry = new Short((short)0);
+ private Object _synchReload = new Short((short)10);
+ private Object _synchAllReady = new Short((short)20);
+
+ protected int _defaultReloadIntervalMillis;
- private Object _registrySynch = new Short((short)0);
- private Object _synchReload = new Short((short)10);
- private Object _synchAllReady = new Short((short)20);
- // default interval between parameter reloads = 6 minutes
- protected int _defaultReloadIntervalMillis = 180000 ;
-
- protected int _pauseTimeMillis;
+ protected int _pauseTimeMillis;
- } // ____________________________________________________________________________
+} // ____________________________________________________________________________
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java 2006-12-14 18:47:51 UTC (rev 8325)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java 2006-12-14 18:49:41 UTC (rev 8326)
@@ -0,0 +1,228 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * All ESB 'managed' listeners (gateways and message) should extend this class.
+ *
+ * <p/> Responsibility as an Observable is to notify Observers when it
+ * is ready to run (Boolean.TRUE) and when it's not (Boolean.FALSE)
+ *
+ * <br/> It must be a Runnable that will be started in a thread forked and controlled
+ * by a ListenerManager (the _controller)
+ *
+ * <br/> It is supposed to manage it's own thread pool of 'actions' (the _execService)
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+public abstract class AbstractManagedListener extends Observable implements Runnable, Observer
+{
+ public abstract boolean isMessageAware();
+ public abstract boolean initializeRun();
+ public abstract boolean finalizeRun();
+ public abstract void waitForEventAndProcess(long maxWaitMillis);
+
+ // disable default constructor
+ private AbstractManagedListener() {}
+
+ /**
+ * public constructor
+ * @param controller ListenerManager - the controlling process
+ * @param config ConfigTree - Containing 'static' configuration for this instance
+ * @throws Exception
+ */
+ protected AbstractManagedListener(ListenerManager controller, ConfigTree config)
+ throws ConfigurationException
+ {
+ _pauseLapseInMillis = 50;
+ _defaultMaxThreads = 1;
+ _controller = controller;
+ _config = config;
+
+ _logger = Logger.getLogger(this.getClass());
+ checkMyParms();
+ } //________________________________
+
+
+ protected void checkMyParms() throws ConfigurationException
+ {
+ _eprCategoryName= _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+
+ String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
+ if (!Util.isNullString(attr))
+ try { _maxThreads = Integer.parseInt(attr); }
+ catch (Exception e) { attr = null; }
+ if (Util.isNullString(attr))
+ {
+ _maxThreads = _defaultMaxThreads;
+ _logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
+ +" - Using default value of <"+_maxThreads+">");
+ }
+ } //________________________________
+
+ protected void registerProcess() throws RegistryException
+ {
+ _bRegistered = false;
+ _controller.register(_config,_epr);
+ _bRegistered = true;
+ } // ________________________________
+
+ protected void unregisterProcess() throws RegistryException
+ {
+ if (_bRegistered)
+ _controller.unRegister(_eprCategoryName, _eprName, _epr);
+ } //________________________________
+
+ public boolean ensureThreadAvailable()
+ {
+ while ( (_qRunningThreads >= _maxThreads)
+ &&_controller.continueLooping())
+ try { Thread.sleep(_pauseLapseInMillis); }
+ catch (InterruptedException e) { return false; }
+
+ return _controller.continueLooping();
+ } //________________________________
+
+ /**
+ * Wait until the registration process finished, a PickupCourier was obtained
+ * , and the pool thread is instantiated
+ */
+ public void waitUntilReady()
+ {
+ while(null==_execService)
+ try { Thread.sleep(_pauseLapseInMillis); }
+ catch (InterruptedException e)
+ { _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
+ } //________________________________
+
+ // Child threads will send a -1 when their run() method ends
+ // we need to prevent picking up Messages when there are no available threads in pool
+ public void update(Observable o, Object arg)
+ {
+ if (arg instanceof Integer)
+ updateThreadCount((Integer)arg);
+ } //________________________________
+
+ protected void resetThreadCount()
+ {
+ synchronized (_synchThreads)
+ { _qRunningThreads = 0;}
+ } //________________________________
+
+ protected void updateThreadCount(Integer i)
+ {
+ synchronized (_synchThreads)
+ { _qRunningThreads += i.intValue();}
+ _logger.debug("Thread pool ("+getClass().getSimpleName()+") used ="+_qRunningThreads+"/"+_maxThreads);
+ } //________________________________
+
+ protected String obtainAttribute(String p_sAtt, String p_sDefault)
+ throws ConfigurationException
+ {
+ _logger.debug("Reading value for " + p_sAtt);
+ String sVal = _config.getAttribute(p_sAtt);
+ if ((null == sVal) && (null == p_sDefault))
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+
+ /**
+ * Loops until controlling process determines
+ * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
+ * <br/>When one is received, instantiates an action processing pipeline to process it
+ */
+ public void run()
+ {
+ _logger.debug("run() method of "+this.getClass().getSimpleName()
+ +" started on thread "+Thread.currentThread().getName());
+
+ if (initializeRun())
+ {
+ // Instantiate the pool thread and set active count to zero
+ _execService = Executors.newFixedThreadPool(_maxThreads);
+ resetThreadCount();
+
+ setChanged();
+ notifyObservers(Boolean.TRUE);
+
+ while (_controller.continueLooping())
+ {
+ // Only pickup a message when a thread is available
+ if (! ensureThreadAvailable())
+ break;
+
+ long lWait = _controller.millisToWait();
+
+ // This if() is just in case (it should never happen - it's a safety net)
+ if (lWait>0)
+ waitForEventAndProcess(lWait);
+ }
+ setChanged();
+ notifyObservers(Boolean.FALSE);
+ }
+
+ finalizeRun();
+
+ // shutdown should wait for all child threads to finish
+ if (null!=_execService)
+ _execService.shutdown();
+
+ _logger.debug("run() method of "+this.getClass().getSimpleName()
+ +" finished on thread "+Thread.currentThread().getName());
+ } // _______________________________
+
+
+ protected ConfigTree _config;
+ protected ListenerManager _controller;
+
+ protected boolean _bRegistered;
+ protected String _eprCategoryName;
+ protected String _eprName;
+ protected EPR _epr;
+
+ protected int _maxThreads;
+ protected int _defaultMaxThreads;
+ protected long _pauseLapseInMillis;
+ protected ExecutorService _execService;
+
+ private Object _synchThreads = new Short((short)-1);
+ private int _qRunningThreads;
+
+ protected Logger _logger;
+}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-14 18:47:51 UTC (rev 8325)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-14 18:49:41 UTC (rev 8326)
@@ -23,21 +23,18 @@
package org.jboss.soa.esb.listeners.message;
import java.lang.reflect.Method;
-import java.util.Observable;
-import java.util.Observer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.AbstractManagedListener;
+import org.jboss.soa.esb.listeners.ListenerManager;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
/**
@@ -50,220 +47,117 @@
* @since Version 4.0
*/
-public class MessageAwareListener extends Observable implements Runnable, Observer
+public class MessageAwareListener extends AbstractManagedListener
{
-
/**
* public constructor
- * @param controller EsbListenerController - the controlling process
+ * @param controller ListenerManager - the controlling process
* @param config ConfigTree - Containing 'static' configuration for this instance
* @throws Exception
*/
- public MessageAwareListener(EsbListenerController controller, ConfigTree config)
+ public MessageAwareListener(ListenerManager controller, ConfigTree config)
throws ConfigurationException
{
- _millisToWaitForThread = 50;
- _defaultMaxThreads = 1;
- _controller = controller;
- _config = config;
-
- checkMyParms();
+ super (controller, config);
} // _______________________________
/**
- * Wait until the registration process finished, a PickupCourier was obtained
- * , and the pool thread is instantiated
- */
- public void waitUntilReadyToPickup()
- {
- while(null==_execService)
- try { Thread.sleep(50); }
- catch (InterruptedException e)
- { _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
- } //________________________________
-
- // Child threads will send a -1 when their run() method ends
- // we need to prevent picking up Messages when there are no available threads in pool
- public void update(Observable o, Object arg)
- {
- if (arg instanceof Integer)
- {
- _qRunningThreads += ((Integer)arg).intValue();
- _logger.debug("Thread returned to pool("+_qRunningThreads+"/"+_maxThreads);
- }
- } //________________________________
-
- /**
- * Loops until controlling process determines
- * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
- * <br/>When one is received, instantiates an action processing pipeline to process it
- */
- public void run()
- {
- _logger.debug("run() method of "+this.getClass().getSimpleName()
- +" started on thread "+Thread.currentThread().getName());
-
- boolean bRegisterOK = false;
- try
- {
- registerProcess();
- bRegisterOK = true;
- }
- catch (Exception re)
- { _logger.fatal("Could not register service " + re.getLocalizedMessage(),re); }
-
- if (bRegisterOK)
- {
- // Instantiate the pool thread and set active count to zero
- _execService = Executors.newFixedThreadPool(_maxThreads);
- _qRunningThreads = 0;
-
- setChanged();
- notifyObservers(Boolean.TRUE);
-
- while (_controller.continueLooping())
- {
- // Check to see if there are available threads in pool
- if (_qRunningThreads >= _maxThreads)
- try
- {
- Thread.sleep(_millisToWaitForThread);
- continue;
- }
- catch (InterruptedException e)
- {
- break;
- }
-
- // Only pickup a message when a thread is available
- long lWait = _controller.millisToWait();
-
- // Just in case (this should never happen - it's a safety net)
- if (lWait< 1)
- continue;
-
- Message message = null;
- try { message = (lWait > 0 ) ? _pickUpCourier.pickup(lWait) : null; }
- catch (CourierTimeoutException e) { continue; }
- catch (CourierException e)
- {
- _logger.error(e);
- continue;
- }
-
- if (null!=message)
- {
- ActionProcessingPipeline chain = null;
-
- try
- {
- chain = new ActionProcessingPipeline(message,_config);
- chain.addObserver(this);
- _qRunningThreads++;
- _logger.debug("Thread claimed from pool("+_qRunningThreads+"/"+_maxThreads);
- _execService.execute(chain);
- }
- catch (Exception e) { _logger.error(e); continue; }
-
- }
- }
- setChanged();
- notifyObservers(Boolean.FALSE);
- }
-
- try { unregisterProcess(); }
- catch (Exception re)
- {
- _logger.warn("Could not un register service " + re.getLocalizedMessage());
- }
- // shutdown should wait for all child threads to finish
- if (null!=_execService)
- _execService.shutdown();
-
- _logger.debug("run() method of "+this.getClass().getSimpleName()
- +" finished on thread "+Thread.currentThread().getName());
- } // _______________________________
-
- /**
* Check for mandatory and optional attributes in parameter tree
*
- * @throws Exception -
+ * @throws ConfigurationException -
* if mandatory atts are not right or actionClass not in
* classpath
*/
protected void checkMyParms() throws ConfigurationException
{
- _eprCategoryName= obtainAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG,null);
- _eprName = obtainAttribute(ListenerTagNames.SERVICE_NAME_TAG,null);
-
- String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
- if (!Util.isNullString(attr))
- try { _maxThreads = Integer.parseInt(attr); }
- catch (Exception e) { attr = null; }
- if (Util.isNullString(attr))
- {
- _maxThreads = _defaultMaxThreads;
- _logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
- +" - Using default value of <"+_maxThreads+">");
- }
+ super.checkMyParms();
+ if (Util.isNullString(_eprCategoryName))
+ throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ if (Util.isNullString(_eprName))
+ throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_NAME_TAG);
} // ________________________________
- private void registerProcess() throws Exception
- {
- _bRegistered = false;
- _controller.register(_config,_epr);
- _bRegistered = true;
- _pickUpCourier = CourierFactory.getPickupCourier(_epr);
- } // ________________________________
+ @Override
+ public boolean isMessageAware() { return true; }
+
+
+ @Override
+ public boolean finalizeRun()
+ {
+ try
+ {
+ if (null!=_pickUpCourier)
+ try
+ {
+ Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
+ cleanMethod.invoke(_pickUpCourier, new Object[] {});
+ }
+ catch (NoSuchMethodException e) {/* OK Just don't invoke it */ }
+ catch (Exception e)
+ {
+ _logger.error("Problems invoking courier.clean() Method",e);
+ }
- private void unregisterProcess() throws Exception
- {
- if (null!=_pickUpCourier)
- try
- {
- Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
- cleanMethod.invoke(_pickUpCourier, new Object[] {});
- }
- catch (NoSuchMethodException e) {/* OK Just don't invoke it */ }
- catch (Exception e)
- {
- _logger.error("Problems invoking courier.clean() Method",e);
- }
+ unregisterProcess();
+ return true;
+ }
+ catch (RegistryException re)
+ {
+ _logger.warn("Could not un register service " + re.getLocalizedMessage());
+ return false;
+ }
- if (_bRegistered)
- _controller.unRegister(_eprCategoryName, _eprName, _epr);
+ } //________________________________
- } // ________________________________
+ @Override
+ public boolean initializeRun()
+ {
+ try
+ {
+ registerProcess();
+ _pickUpCourier = CourierFactory.getPickupCourier(_epr);
+ return true;
+ }
+ catch (Exception re)
+ {
+ _logger.fatal("Could not register service " + re.getLocalizedMessage(),re);
+ return false;
+ }
+
+ } //________________________________
- protected String obtainAttribute(String p_sAtt, String p_sDefault)
- throws ConfigurationException
+ @Override
+ public void waitForEventAndProcess(long maxWaitMillis)
{
- _logger.info("Reading value for " + p_sAtt);
- String sVal = _config.getAttribute(p_sAtt);
- if ((null == sVal) && (null == p_sDefault))
- throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
-
- return (null != sVal) ? sVal : p_sDefault;
- } // ________________________________
-
+ Message message = null;
+ try { message = (maxWaitMillis > 0 ) ? _pickUpCourier.pickup(maxWaitMillis) : null; }
+ catch (CourierTimeoutException e) { return; }
+ catch (CourierException e)
+ {
+ _logger.error(e);
+ return;
+ }
- protected ConfigTree _config = null;
- protected EsbListenerController _controller = null;
+ if (null!=message)
+ {
+ ActionProcessingPipeline chain = null;
- protected String _eprCategoryName;
- protected String _eprName;
- protected EPR _epr;
-
- protected int _maxThreads;
- protected int _qRunningThreads;
- protected int _defaultMaxThreads;
- protected long _millisToWaitForThread;
- protected ExecutorService _execService;
- protected boolean _bRegistered;
-
- protected PickUpOnlyCourier _pickUpCourier;
+ try
+ {
+ chain = new ActionProcessingPipeline(message,_config);
+ chain.addObserver(this);
+ updateThreadCount(+1);
+ _execService.execute(chain);
+ }
+ catch (Exception e)
+ {
+ _logger.error(e);
+ return;
+ }
+ }
+
+ } //________________________________
- protected static transient Logger _logger = Logger.getLogger(MessageAwareListener.class);
-
+ protected PickUpOnlyCourier _pickUpCourier;
}
More information about the jboss-svn-commits
mailing list