[jboss-svn-commits] JBL Code SVN: r11104 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Apr 18 18:31:00 EDT 2007
Author: bill.burke at jboss.com
Date: 2007-04-18 18:31:00 -0400 (Wed, 18 Apr 2007)
New Revision: 11104
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
formatting
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-04-18 20:07:44 UTC (rev 11103)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-04-18 22:31:00 UTC (rev 11104)
@@ -53,369 +53,376 @@
* CourierFactory to obtain an appropriate Courier for the EPR this listener
* will be listening on <br/>Keeps a thread pool to instantiate
* ActionProcessingPipelines whenever a Message is received
- *
+ *
* @author <a
* href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
* @since Version 4.0
*/
-public class MessageAwareListener extends AbstractThreadedManagedLifecycle
+public class MessageAwareListener extends AbstractThreadedManagedLifecycle
{
- /**
- * serial version uid for this class
- */
- private static final long serialVersionUID = -9198018611828254359L;
-
- /**
- * The minimum error delay.
- */
- private static final long MIN_ERROR_DELAY = 1000 ;
- /**
- * The maximum error delay.
- */
- private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5) ;
-
- /**
- * The action pipeline.
- */
- private ActionProcessingPipeline pipeline ;
-
- /**
- * The error delay.
- */
- private long errorDelay ;
-
- /**
- * public constructor
- *
- * @param config
- * ConfigTree - Containing 'static' configuration for this
- * instance
- * @throws ConfigurationException
- */
- public MessageAwareListener(final ConfigTree config)
- throws ConfigurationException
- {
- super(config);
- _config = config ;
- checkMyParms() ;
- }
+ /**
+ * serial version uid for this class
+ */
+ private static final long serialVersionUID = -9198018611828254359L;
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms () throws ConfigurationException
- {
- _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ /**
+ * The minimum error delay.
+ */
+ private static final long MIN_ERROR_DELAY = 1000;
+ /**
+ * The maximum error delay.
+ */
+ private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5);
- final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG) ;
-
- if (!Util.isNullString(maxThreadVal))
- {
- try
- {
- _maxThreads = Integer.parseInt(maxThreadVal) ;
- }
- catch (NumberFormatException nfe)
- {
- _maxThreads = _defaultMaxThreads ;
- _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">") ;
- }
- }
+ /**
+ * The action pipeline.
+ */
+ private ActionProcessingPipeline pipeline;
- if (Util.isNullString(_eprCategoryName))
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- if (Util.isNullString(_eprName))
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+ /**
+ * The error delay.
+ */
+ private long errorDelay;
- ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
- if (null == eprElement)
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
- _epr = ListenerUtil.assembleEpr(eprElement);
-
- String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- long lSeconds = 10;
- if (null != latency)
- {
- try
- {
- lSeconds = Integer.parseInt(latency);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
- }
- }
- _latencySecs = lSeconds ;
- }
-
- /**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while initialisation.
- */
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- final ActionProcessingPipeline pipeline ;
- try
+ /**
+ * public constructor
+ *
+ * @param config ConfigTree - Containing 'static' configuration for this
+ * instance
+ * @throws ConfigurationException
+ */
+ public MessageAwareListener(final ConfigTree config)
+ throws ConfigurationException
+ {
+ super(config);
+ _config = config;
+ checkMyParms();
+ }
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException
+ {
+ _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+
+ final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
+
+ if (!Util.isNullString(maxThreadVal))
+ {
+ try
+ {
+ _maxThreads = Integer.parseInt(maxThreadVal);
+ }
+ catch (NumberFormatException nfe)
+ {
+ _maxThreads = _defaultMaxThreads;
+ _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">");
+ }
+ }
+
+ if (Util.isNullString(_eprCategoryName))
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ if (Util.isNullString(_eprName))
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+
+ ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
+ if (null == eprElement)
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
+ _epr = ListenerUtil.assembleEpr(eprElement);
+
+ String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ long lSeconds = 10;
+ if (null != latency)
+ {
+ try
+ {
+ lSeconds = Integer.parseInt(latency);
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
+ }
+ }
+ _latencySecs = lSeconds;
+ }
+
+ /**
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise()
+ throws ManagedLifecycleException
+ {
+ final ActionProcessingPipeline pipeline;
+ try
+ {
+ pipeline = new ActionProcessingPipeline(_config);
+ pipeline.initialise();
+ }
+ catch (final ConfigurationException ce)
+ {
+ throw new ManagedLifecycleException("Error configuring action processing pipeline", ce);
+ }
+ this.pipeline = pipeline;
+ final TwoWayCourier pickUpCourier;
+ try
+ {
+ pickUpCourier = CourierFactory.getPickupCourier(_epr);
+ try
+ {
+ final Method setPollLatency = pickUpCourier.getClass().getMethod(
+ "setPollLatency", new Class[]{Long.class});
+ setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+ }
+ catch (final NoSuchMethodException nsme)
+ {
+ // OK, just leave it null
+ }
+ catch (final Exception ex)
+ {
+ CourierUtil.cleanCourier(pickUpCourier);
+ throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex);
+ }
+ }
+ catch (final MalformedEPRException mepre)
+ {
+ throw new ManagedLifecycleException("Malformed EPR: " + _epr);
+ }
+ catch (final CourierException ce)
+ {
+ throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+ }
+
+ _pickUpCourier = pickUpCourier;
+
+ try
+ {
+ RegistryUtil.register(_config, _epr);
+ }
+ catch (final RegistryException re)
+ {
+ CourierUtil.cleanCourier(_pickUpCourier);
+ throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+ }
+ }
+
+ /**
+ * Handle the start of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while starting.
+ */
+ protected void doStart()
+ throws ManagedLifecycleException
+ {
+ checkExecutorTermination();
+
+ _execService = Executors.newFixedThreadPool(_maxThreads);
+
+ super.doStart();
+ }
+
+ /**
+ * Execute on the thread.
+ */
+ protected void doRun()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("doRun() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
+
+ while (isRunning())
+ {
+ // Only pickup a message when a thread is available
+ if (waitForThread(_pauseLapseInMillis))
+ {
+ waitForEventAndProcess(100);
+ }
+ }
+ if (null != _execService)
+ _execService.shutdown();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread " + Thread.currentThread().getName());
+ }
+ }
+
+ public void waitForEventAndProcess(long maxWaitMillis)
+ {
+ final Message message;
+ try
+ {
+ message = (maxWaitMillis > 0) ? _pickUpCourier
+ .pickup(maxWaitMillis) : null;
+ errorDelay = 0;
+ }
+ catch (CourierTimeoutException e)
+ {
+ return;
+ }
+ catch (CourierException e)
+ {
+ _logger.error("Courier Exception", e);
+ if (errorDelay == 0)
+ {
+ errorDelay = MIN_ERROR_DELAY;
+ }
+ else if (errorDelay < MAX_ERROR_DELAY)
+ {
+ errorDelay <<= 1;
+ }
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay);
+ return;
+ }
+
+ if (null != message)
+ {
+ try
+ {
+ final Runnable pipelineRunner = new Runnable()
{
- pipeline = new ActionProcessingPipeline(_config) ;
- pipeline.initialise() ;
- }
- catch (final ConfigurationException ce)
- {
- throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
- }
- this.pipeline = pipeline ;
- final TwoWayCourier pickUpCourier ;
+ public void run()
+ {
+ try
+ {
+ pipeline.process(message);
+ }
+ finally
+ {
+ updateThreadCount(-1);
+ }
+ }
+ };
+ updateThreadCount(+1);
+ _execService.execute(pipelineRunner);
+ }
+ catch (Exception e)
+ {
+ _logger.error("ActionProcessingPipeline exception", e);
+ return;
+ }
+ }
+
+ } // ________________________________
+
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy()
+ throws ManagedLifecycleException
+ {
+ checkExecutorTermination();
+
+ pipeline.destroy();
+ pipeline = null;
+
+ CourierUtil.cleanCourier(_pickUpCourier);
+
+ RegistryUtil.unregister(_eprCategoryName, _eprName, _epr);
+ }
+
+ /**
+ * Check that the existing executor has been closed down.
+ *
+ * @throws ManagedLifecycleException If executor tasks are still active.
+ */
+ private void checkExecutorTermination()
+ throws ManagedLifecycleException
+ {
+ if (_execService != null)
+ {
+ try
+ {
try
{
- pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
- try
- {
- final Method setPollLatency = pickUpCourier.getClass().getMethod(
- "setPollLatency", new Class[] { Long.class });
- setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
- }
- catch (final NoSuchMethodException nsme)
- {
- // OK, just leave it null
- }
- catch (final Exception ex)
- {
- CourierUtil.cleanCourier(pickUpCourier);
- throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
- }
+ if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
+ {
+ throw new ManagedLifecycleException("Tasks remain active in executor");
+ }
}
- catch (final MalformedEPRException mepre)
+ catch (final InterruptedException ie)
{
- throw new ManagedLifecycleException("Malformed EPR: " + _epr) ;
+ throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate");
}
- catch (final CourierException ce)
- {
- throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
- }
-
- _pickUpCourier = pickUpCourier ;
-
+ }
+ finally
+ {
+ _execService = null;
+ }
+ }
+ }
+
+ private boolean waitForThread(final long delay)
+ {
+ boolean result = true;
+ synchronized (_synchThreads)
+ {
+ if (_qRunningThreads >= _maxThreads)
+ {
try
{
- RegistryUtil.register(_config, _epr);
+ _synchThreads.wait(delay);
}
- catch (final RegistryException re)
+ catch (final InterruptedException ie)
{
- CourierUtil.cleanCourier(_pickUpCourier);
- throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
}
- }
-
- /**
- * Handle the start of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while starting.
- */
- protected void doStart()
- throws ManagedLifecycleException
- {
- checkExecutorTermination() ;
-
- _execService = Executors.newFixedThreadPool(_maxThreads) ;
-
- super.doStart() ;
- }
-
- /**
- * Execute on the thread.
- */
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("doRun() method of " + this.getClass().getSimpleName()
- + " started on thread " + Thread.currentThread().getName());
- }
-
- while (isRunning())
- {
- // Only pickup a message when a thread is available
- if (waitForThread(_pauseLapseInMillis))
- {
- waitForEventAndProcess(100) ;
- }
- }
- if (null != _execService)
- _execService.shutdown();
+ result = _qRunningThreads < _maxThreads;
+ }
+ }
+ return result;
+ }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread " + Thread.currentThread().getName());
- }
- }
-
- public void waitForEventAndProcess (long maxWaitMillis)
- {
- final Message message ;
- try
- {
- message = (maxWaitMillis > 0) ? _pickUpCourier
- .pickup(maxWaitMillis) : null;
- errorDelay = 0 ;
- }
- catch (CourierTimeoutException e)
- {
- return;
- }
- catch (CourierException e)
- {
- _logger.error("Courier Exception", e);
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY ;
- }
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1 ;
- }
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
- return;
- }
+ private void updateThreadCount(Integer i)
+ {
+ synchronized (_synchThreads)
+ {
+ _qRunningThreads += i.intValue();
+ if (_qRunningThreads < _maxThreads)
+ {
+ _synchThreads.notifyAll();
+ }
+ }
+ }
- if (null != message)
- {
- try
- {
- final Runnable pipelineRunner = new Runnable() {
- public void run() {
- try {
- pipeline.process(message) ;
- } finally {
- updateThreadCount(-1) ;
- }
- }
- } ;
- updateThreadCount(+1);
- _execService.execute(pipelineRunner);
- }
- catch (Exception e)
- {
- _logger.error("ActionProcessingPipeline exception", e);
- return;
- }
- }
+ private ConfigTree _config;
- } // ________________________________
-
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- checkExecutorTermination() ;
-
- pipeline.destroy() ;
- pipeline = null ;
-
- CourierUtil.cleanCourier(_pickUpCourier);
-
- RegistryUtil.unregister(_eprCategoryName, _eprName, _epr) ;
- }
-
- /**
- * Check that the existing executor has been closed down.
- * @throws ManagedLifecycleException If executor tasks are still active.
- */
- private void checkExecutorTermination()
- throws ManagedLifecycleException
- {
- if (_execService != null)
- {
- try
- {
- try
- {
- if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
- {
- throw new ManagedLifecycleException("Tasks remain active in executor") ;
- }
- }
- catch (final InterruptedException ie)
- {
- throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate") ;
- }
- }
- finally
- {
- _execService = null ;
- }
- }
- }
+ private String _eprCategoryName;
- private boolean waitForThread(final long delay)
- {
- boolean result = true ;
- synchronized(_synchThreads)
- {
- if (_qRunningThreads >= _maxThreads)
- {
- try
- {
- _synchThreads.wait(delay) ;
- }
- catch (final InterruptedException ie) {}
- result = _qRunningThreads < _maxThreads ;
- }
- }
- return result ;
- }
+ private String _eprName;
- private void updateThreadCount(Integer i)
- {
- synchronized (_synchThreads)
- {
- _qRunningThreads += i.intValue();
- if (_qRunningThreads < _maxThreads)
- {
- _synchThreads.notifyAll() ;
- }
- }
- }
+ private EPR _epr;
- private ConfigTree _config;
+ private int _maxThreads;
- private String _eprCategoryName;
+ private int _defaultMaxThreads = 1;
- private String _eprName;
+ private long _latencySecs;
- private EPR _epr;
+ private long _pauseLapseInMillis = 50;
- private int _maxThreads;
+ private ExecutorService _execService;
- private int _defaultMaxThreads = 1;
-
- private long _latencySecs;
-
- private long _pauseLapseInMillis = 50 ;
+ private Object _synchThreads = new Short((short) -1);
- private ExecutorService _execService;
+ private int _qRunningThreads;
- private Object _synchThreads = new Short((short) -1);
-
- private int _qRunningThreads;
+ private Logger _logger = Logger.getLogger(MessageAwareListener.class);
- private Logger _logger = Logger.getLogger(MessageAwareListener.class);
-
- private PickUpOnlyCourier _pickUpCourier;
+ private PickUpOnlyCourier _pickUpCourier;
}
More information about the jboss-svn-commits
mailing list