[jboss-svn-commits] JBL Code SVN: r15836 - labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Oct 15 07:15:55 EDT 2007
Author: tfennelly
Date: 2007-10-15 07:15:55 -0400 (Mon, 15 Oct 2007)
New Revision: 15836
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Formatting only - to make it more readable.
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-10-15 10:45:00 UTC (rev 15835)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-10-15 11:15:55 UTC (rev 15836)
@@ -22,21 +22,12 @@
package org.jboss.soa.esb.listeners.message;
-import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.couriers.*;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -48,368 +39,324 @@
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
/**
* Esb Message aware transport independent listener. <p/> Relies on the
* CourierFactory to obtain an appropriate Courier for the EPR this listener
* will be listening on <br/>Keeps a thread pool to instantiate
* ActionProcessingPipelines whenever a Message is received
- *
+ *
* @author <a
* href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
* @since Version 4.0
*/
-public class MessageAwareListener extends AbstractThreadedManagedLifecycle
-{
- /**
- * serial version uid for this class
- */
- private static final long serialVersionUID = -9198018611828254359L;
-
- /**
- * The minimum error delay.
- */
- private static final long MIN_ERROR_DELAY = 1000 ;
- /**
- * The maximum error delay.
- */
- private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5) ;
-
- /**
- * The action pipeline.
- */
- private ActionProcessingPipeline pipeline ;
-
- /**
- * The error delay.
- */
- private long errorDelay ;
-
- /**
- * public constructor
- *
- * @param config
- * ConfigTree - Containing 'static' configuration for this
- * instance
- * @throws ConfigurationException
- */
- public MessageAwareListener(final ConfigTree config)
- throws ConfigurationException
- {
- super(config);
- _config = config ;
- checkMyParms() ;
- }
+public class MessageAwareListener extends AbstractThreadedManagedLifecycle {
+ /**
+ * serial version uid for this class
+ */
+ private static final long serialVersionUID = -9198018611828254359L;
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms () throws ConfigurationException
- {
- _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ /**
+ * The minimum error delay.
+ */
+ private static final long MIN_ERROR_DELAY = 1000;
+ /**
+ * The maximum error delay.
+ */
+ private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5);
- final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG) ;
-
- if (!Util.isNullString(maxThreadVal))
- {
- try
- {
- _maxThreads = Integer.parseInt(maxThreadVal) ;
- }
- catch (NumberFormatException nfe)
- {
- _maxThreads = _defaultMaxThreads ;
- _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">") ;
- }
- }
+ /**
+ * The action pipeline.
+ */
+ private ActionProcessingPipeline pipeline;
- if (Util.isNullString(_eprCategoryName))
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- if (Util.isNullString(_eprName))
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+ /**
+ * The error delay.
+ */
+ private long errorDelay;
- ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
- if (null == eprElement)
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
- _epr = ListenerUtil.assembleEpr(eprElement);
-
- String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- long lSeconds = 10;
- if (null != latency)
- {
- try
- {
- lSeconds = Integer.parseInt(latency);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
- }
- }
- _latencySecs = lSeconds ;
- }
-
- /**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while initialisation.
- */
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- final ActionProcessingPipeline pipeline ;
- try
- {
- pipeline = new ActionProcessingPipeline(_config) ;
- pipeline.initialise() ;
+ /**
+ * public constructor
+ *
+ * @param config ConfigTree - Containing 'static' configuration for this
+ * instance
+ * @throws ConfigurationException
+ */
+ public MessageAwareListener(final ConfigTree config)
+ throws ConfigurationException {
+ super(config);
+ _config = config;
+ checkMyParms();
+ }
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException {
+ _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+
+ final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
+
+ if (!Util.isNullString(maxThreadVal)) {
+ try {
+ _maxThreads = Integer.parseInt(maxThreadVal);
}
- catch (final ConfigurationException ce)
- {
- throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
+ catch (NumberFormatException nfe) {
+ _maxThreads = _defaultMaxThreads;
+ _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">");
}
- this.pipeline = pipeline ;
- final TwoWayCourier pickUpCourier ;
- try
- {
- pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
- try
- {
- final Method setPollLatency = pickUpCourier.getClass().getMethod(
- "setPollLatency", new Class[] { Long.class });
- setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
- }
- catch (final NoSuchMethodException nsme)
- {
- // OK, just leave it null
- }
- catch (final Exception ex)
- {
- CourierUtil.cleanCourier(pickUpCourier);
- throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
- }
+ }
+
+ if (Util.isNullString(_eprCategoryName))
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ if (Util.isNullString(_eprName))
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+
+ ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
+ if (null == eprElement)
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
+ _epr = ListenerUtil.assembleEpr(eprElement);
+
+ String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ long lSeconds = 10;
+ if (null != latency) {
+ try {
+ lSeconds = Integer.parseInt(latency);
}
- catch (final MalformedEPRException mepre)
- {
- throw new ManagedLifecycleException("Malformed EPR: " + _epr) ;
+ catch (NumberFormatException e) {
+ _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
}
- catch (final CourierException ce)
- {
- throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+ }
+ _latencySecs = lSeconds;
+ }
+
+ /**
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise()
+ throws ManagedLifecycleException {
+ final ActionProcessingPipeline pipeline;
+ try {
+ pipeline = new ActionProcessingPipeline(_config);
+ pipeline.initialise();
+ }
+ catch (final ConfigurationException ce) {
+ throw new ManagedLifecycleException("Error configuring action processing pipeline", ce);
+ }
+ this.pipeline = pipeline;
+ final TwoWayCourier pickUpCourier;
+ try {
+ pickUpCourier = CourierFactory.getPickupCourier(_epr);
+ try {
+ final Method setPollLatency = pickUpCourier.getClass().getMethod(
+ "setPollLatency", new Class[]{Long.class});
+ setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
}
-
- _pickUpCourier = pickUpCourier ;
-
- try
- {
- RegistryUtil.register(_config, _epr);
+ catch (final NoSuchMethodException nsme) {
+ // OK, just leave it null
}
- catch (final RegistryException re)
- {
- CourierUtil.cleanCourier(_pickUpCourier);
- throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+ catch (final Exception ex) {
+ CourierUtil.cleanCourier(pickUpCourier);
+ throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex);
}
}
-
- /**
- * Handle the start of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while starting.
- */
- protected void doStart()
- throws ManagedLifecycleException
- {
- checkExecutorTermination() ;
-
- _execService = Executors.newFixedThreadPool(_maxThreads) ;
-
- super.doStart() ;
+ catch (final MalformedEPRException mepre) {
+ throw new ManagedLifecycleException("Malformed EPR: " + _epr);
}
-
- /**
- * Execute on the thread.
- */
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("doRun() method of " + this.getClass().getSimpleName()
- + " started on thread " + Thread.currentThread().getName());
- }
-
- while (isRunning())
- {
- // Only pickup a message when a thread is available
- if (waitForThread(_pauseLapseInMillis))
- {
- waitForEventAndProcess(100) ;
- }
- }
+ catch (final CourierException ce) {
+ throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+ }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread " + Thread.currentThread().getName());
+ _pickUpCourier = pickUpCourier;
+
+ try {
+ RegistryUtil.register(_config, _epr);
+ }
+ catch (final RegistryException re) {
+ CourierUtil.cleanCourier(_pickUpCourier);
+ throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+ }
+ }
+
+ /**
+ * Handle the start of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while starting.
+ */
+ protected void doStart()
+ throws ManagedLifecycleException {
+ checkExecutorTermination();
+
+ _execService = Executors.newFixedThreadPool(_maxThreads);
+
+ super.doStart();
+ }
+
+ /**
+ * Execute on the thread.
+ */
+ protected void doRun() {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("doRun() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
+
+ while (isRunning()) {
+ // Only pickup a message when a thread is available
+ if (waitForThread(_pauseLapseInMillis)) {
+ waitForEventAndProcess(100);
}
}
-
- public void waitForEventAndProcess (long maxWaitMillis)
- {
- final Message message ;
- try
- {
- message = (maxWaitMillis > 0) ? _pickUpCourier
- .pickup(maxWaitMillis) : null;
- errorDelay = 0 ;
- }
- catch (CourierTimeoutException e)
- {
- return;
- }
- catch (CourierException e)
- {
- _logger.debug("Courier Exception", e);
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY ;
- }
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1 ;
- }
- _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
- return;
- }
- if (null != message)
- {
- final Runnable pipelineRunner = new Runnable() {
- public void run() {
- try {
- pipeline.process(message) ;
- } finally {
- updateThreadCount(-1) ;
- }
- }
- } ;
- updateThreadCount(+1);
- _execService.execute(pipelineRunner);
- }
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread " + Thread.currentThread().getName());
+ }
+ }
- } // ________________________________
-
- /**
- * Handle the threaded destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doThreadedDestroy()
- throws ManagedLifecycleException
- {
- if (_execService != null)
- {
- _execService.shutdown() ;
- checkExecutorTermination() ;
+ public void waitForEventAndProcess(long maxWaitMillis) {
+ final Message message;
+ try {
+ message = (maxWaitMillis > 0) ? _pickUpCourier
+ .pickup(maxWaitMillis) : null;
+ errorDelay = 0;
+ }
+ catch (CourierTimeoutException e) {
+ return;
+ }
+ catch (CourierException e) {
+ _logger.debug("Courier Exception", e);
+ if (errorDelay == 0) {
+ errorDelay = MIN_ERROR_DELAY;
+ } else if (errorDelay < MAX_ERROR_DELAY) {
+ errorDelay <<= 1;
}
-
- pipeline.destroy() ;
- pipeline = null ;
-
- CourierUtil.cleanCourier(_pickUpCourier);
- RegistryUtil.unregister(_eprCategoryName, _eprName, _epr) ;
+ _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds");
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay);
+ return;
}
-
- /**
- * Check that the existing executor has been closed down.
- * @throws ManagedLifecycleException If executor tasks are still active.
- */
- private void checkExecutorTermination()
- throws ManagedLifecycleException
- {
- if (_execService != null)
- {
- try
- {
- try
- {
- if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
- {
- throw new ManagedLifecycleException("Tasks remain active in executor") ;
- }
+
+ if (null != message) {
+ final Runnable pipelineRunner = new Runnable() {
+ public void run() {
+ try {
+ pipeline.process(message);
+ } finally {
+ updateThreadCount(-1);
}
- catch (final InterruptedException ie)
- {
- throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate") ;
+ }
+ };
+ updateThreadCount(+1);
+ _execService.execute(pipelineRunner);
+ }
+
+ } // ________________________________
+
+ /**
+ * Handle the threaded destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doThreadedDestroy()
+ throws ManagedLifecycleException {
+ if (_execService != null) {
+ _execService.shutdown();
+ checkExecutorTermination();
+ }
+
+ pipeline.destroy();
+ pipeline = null;
+
+ CourierUtil.cleanCourier(_pickUpCourier);
+ RegistryUtil.unregister(_eprCategoryName, _eprName, _epr);
+ }
+
+ /**
+ * Check that the existing executor has been closed down.
+ *
+ * @throws ManagedLifecycleException If executor tasks are still active.
+ */
+ private void checkExecutorTermination()
+ throws ManagedLifecycleException {
+ if (_execService != null) {
+ try {
+ try {
+ if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS)) {
+ throw new ManagedLifecycleException("Tasks remain active in executor");
}
}
- finally
- {
- _execService = null ;
+ catch (final InterruptedException ie) {
+ throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate");
}
}
+ finally {
+ _execService = null;
+ }
}
+ }
- private boolean waitForThread(final long delay)
- {
- boolean result = true ;
- synchronized(_synchThreads)
- {
- if (_qRunningThreads >= _maxThreads)
- {
- try
- {
- _synchThreads.wait(delay) ;
- }
- catch (final InterruptedException ie) {}
- result = _qRunningThreads < _maxThreads ;
+ private boolean waitForThread(final long delay) {
+ boolean result = true;
+ synchronized (_synchThreads) {
+ if (_qRunningThreads >= _maxThreads) {
+ try {
+ _synchThreads.wait(delay);
}
+ catch (final InterruptedException ie) {
+ }
+ result = _qRunningThreads < _maxThreads;
}
- return result ;
}
+ return result;
+ }
- private void updateThreadCount(Integer i)
- {
- synchronized (_synchThreads)
- {
- _qRunningThreads += i.intValue();
- if (_qRunningThreads < _maxThreads)
- {
- _synchThreads.notifyAll() ;
- }
+ private void updateThreadCount(Integer i) {
+ synchronized (_synchThreads) {
+ _qRunningThreads += i.intValue();
+ if (_qRunningThreads < _maxThreads) {
+ _synchThreads.notifyAll();
}
}
+ }
- private ConfigTree _config;
+ private ConfigTree _config;
- private String _eprCategoryName;
+ private String _eprCategoryName;
- private String _eprName;
+ private String _eprName;
- private EPR _epr;
+ private EPR _epr;
- private int _maxThreads;
+ private int _maxThreads;
- private int _defaultMaxThreads = 1;
-
- private long _latencySecs;
-
- private long _pauseLapseInMillis = 50 ;
+ private int _defaultMaxThreads = 1;
- private ExecutorService _execService;
+ private long _latencySecs;
- private Object _synchThreads = new Short((short) -1);
-
- private int _qRunningThreads;
+ private long _pauseLapseInMillis = 50;
- private Logger _logger = Logger.getLogger(MessageAwareListener.class);
-
- private PickUpOnlyCourier _pickUpCourier;
+ private ExecutorService _execService;
+
+ private Object _synchThreads = new Short((short) -1);
+
+ private int _qRunningThreads;
+
+ private Logger _logger = Logger.getLogger(MessageAwareListener.class);
+
+ private PickUpOnlyCourier _pickUpCourier;
}
More information about the jboss-svn-commits
mailing list