[jboss-svn-commits] JBL Code SVN: r10249 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Mar 15 22:10:28 EDT 2007
Author: bill.burke at jboss.com
Date: 2007-03-15 22:10:28 -0400 (Thu, 15 Mar 2007)
New Revision: 10249
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
formatting
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-03-16 01:59:04 UTC (rev 10248)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2007-03-16 02:10:28 UTC (rev 10249)
@@ -22,13 +22,6 @@
package org.jboss.soa.esb.listeners.message;
-import java.lang.reflect.Method;
-import java.util.Observable;
-import java.util.Observer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
import org.jboss.soa.esb.ConfigurationException;
@@ -50,325 +43,332 @@
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
+import java.lang.reflect.Method;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
/**
* Esb Message aware transport independent listener. <p/> Relies on the
* CourierFactory to obtain an appropriate Courier for the EPR this listener
* will be listening on <br/>Keeps a thread pool to instantiate
* ActionProcessingPipelines whenever a Message is received
- *
+ *
* @author <a
* href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
* @since Version 4.0
*/
-public class MessageAwareListener extends AbstractThreadedManagedLifecycle implements Observer
+public class MessageAwareListener extends AbstractThreadedManagedLifecycle implements Observer
{
- /**
- * serial version uid for this class
- */
- private static final long serialVersionUID = -9198018611828254359L;
-
- /**
- * public constructor
- *
- * @param config
- * ConfigTree - Containing 'static' configuration for this
- * instance
- * @throws ConfigurationException
- */
- public MessageAwareListener(final ConfigTree config)
- throws ConfigurationException
- {
- super(config);
- _config = config ;
- checkMyParms() ;
- }
+ /**
+ * serial version uid for this class
+ */
+ private static final long serialVersionUID = -9198018611828254359L;
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms () throws ConfigurationException
- {
- _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ /**
+ * public constructor
+ *
+ * @param config ConfigTree - Containing 'static' configuration for this
+ * instance
+ * @throws ConfigurationException
+ */
+ public MessageAwareListener(final ConfigTree config)
+ throws ConfigurationException
+ {
+ super(config);
+ _config = config;
+ checkMyParms();
+ }
- final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG) ;
-
- if (!Util.isNullString(maxThreadVal))
- {
- try
- {
- _maxThreads = Integer.parseInt(maxThreadVal) ;
- }
- catch (NumberFormatException nfe)
- {
- _maxThreads = _defaultMaxThreads ;
- _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">") ;
- }
- }
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException
+ {
+ _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- if (Util.isNullString(_eprCategoryName))
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- if (Util.isNullString(_eprName))
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+ final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
- ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
- if (null == eprElement)
- throw new ConfigurationException(
- "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
- _epr = ListenerUtil.assembleEpr(eprElement);
-
- String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- long lSeconds = 10;
- if (null != latency)
- {
- try
- {
- lSeconds = Integer.parseInt(latency);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
- }
- }
- _latencySecs = lSeconds ;
- }
-
- /**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while initialisation.
- */
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- final TwoWayCourier pickUpCourier ;
+ if (!Util.isNullString(maxThreadVal))
+ {
+ try
+ {
+ _maxThreads = Integer.parseInt(maxThreadVal);
+ }
+ catch (NumberFormatException nfe)
+ {
+ _maxThreads = _defaultMaxThreads;
+ _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">");
+ }
+ }
+
+ if (Util.isNullString(_eprCategoryName))
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ if (Util.isNullString(_eprName))
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+
+ ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
+ if (null == eprElement)
+ throw new ConfigurationException(
+ "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
+ _epr = ListenerUtil.assembleEpr(eprElement);
+
+ String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ long lSeconds = 10;
+ if (null != latency)
+ {
+ try
+ {
+ lSeconds = Integer.parseInt(latency);
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
+ }
+ }
+ _latencySecs = lSeconds;
+ }
+
+ /**
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise()
+ throws ManagedLifecycleException
+ {
+ final TwoWayCourier pickUpCourier;
+ try
+ {
+ pickUpCourier = CourierFactory.getPickupCourier(_epr);
+ try
+ {
+ final Method setPollLatency = pickUpCourier.getClass().getMethod(
+ "setPollLatency", new Class[]{Long.class});
+ setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+ }
+ catch (final NoSuchMethodException nsme)
+ {
+ // OK, just leave it null
+ }
+ catch (final Exception ex)
+ {
+ CourierUtil.cleanCourier(pickUpCourier);
+ throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex);
+ }
+ }
+ catch (final MalformedEPRException mepre)
+ {
+ throw new ManagedLifecycleException("Malformed EPR: " + _epr);
+ }
+ catch (final CourierException ce)
+ {
+ throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+ }
+
+ _pickUpCourier = pickUpCourier;
+
+ try
+ {
+ RegistryUtil.register(_config, _epr);
+ }
+ catch (final RegistryException re)
+ {
+ CourierUtil.cleanCourier(_pickUpCourier);
+ throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+ }
+ }
+
+ /**
+ * Handle the start of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while starting.
+ */
+ protected void doStart()
+ throws ManagedLifecycleException
+ {
+ checkExecutorTermination();
+
+ _execService = Executors.newFixedThreadPool(_maxThreads);
+
+ super.doStart();
+ }
+
+ /**
+ * Execute on the thread.
+ */
+ protected void doRun()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("doRun() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
+
+ while (isRunning())
+ {
+ // Only pickup a message when a thread is available
+ if (getActiveThreadCount() >= _maxThreads)
+ {
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _pauseLapseInMillis);
+ }
+ else
+ {
+ waitForEventAndProcess(100);
+ }
+ }
+ if (null != _execService)
+ _execService.shutdown();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread " + Thread.currentThread().getName());
+ }
+ }
+
+ public void waitForEventAndProcess(long maxWaitMillis)
+ {
+ Message message = null;
+ try
+ {
+ message = (maxWaitMillis > 0) ? _pickUpCourier
+ .pickup(maxWaitMillis) : null;
+ }
+ catch (CourierTimeoutException e)
+ {
+ return;
+ }
+ catch (CourierException e)
+ {
+ _logger.error("Courier Exception", e);
+ return;
+ }
+
+ if (null != message)
+ {
+ ActionProcessingPipeline chain = null;
+
+ try
+ {
+ chain = new ActionProcessingPipeline(message, _config);
+ chain.addObserver(this);
+ updateThreadCount(+1);
+ _execService.execute(chain);
+ }
+ catch (Exception e)
+ {
+ _logger.error("ActionProcessingPipeline exception", e);
+ return;
+ }
+ }
+
+ } // ________________________________
+
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy()
+ throws ManagedLifecycleException
+ {
+ checkExecutorTermination();
+
+ CourierUtil.cleanCourier(_pickUpCourier);
+
+ RegistryUtil.unregister(_eprCategoryName, _eprName, _epr);
+ }
+
+ /**
+ * Check that the existing executor has been closed down.
+ *
+ * @throws ManagedLifecycleException If executor tasks are still active.
+ */
+ private void checkExecutorTermination()
+ throws ManagedLifecycleException
+ {
+ if (_execService != null)
+ {
+ try
+ {
try
{
- pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
- try
- {
- final Method setPollLatency = pickUpCourier.getClass().getMethod(
- "setPollLatency", new Class[] { Long.class });
- setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
- }
- catch (final NoSuchMethodException nsme)
- {
- // OK, just leave it null
- }
- catch (final Exception ex)
- {
- CourierUtil.cleanCourier(pickUpCourier);
- throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
- }
+ if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
+ {
+ throw new ManagedLifecycleException("Tasks remain active in executor");
+ }
}
- catch (final MalformedEPRException mepre)
+ catch (final InterruptedException ie)
{
- throw new ManagedLifecycleException("Malformed EPR: " + _epr) ;
+ throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate");
}
- catch (final CourierException ce)
- {
- throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
- }
-
- _pickUpCourier = pickUpCourier ;
-
- try
- {
- RegistryUtil.register(_config, _epr);
- }
- catch (final RegistryException re)
- {
- CourierUtil.cleanCourier(_pickUpCourier);
- throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
- }
- }
-
- /**
- * Handle the start of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while starting.
- */
- protected void doStart()
- throws ManagedLifecycleException
- {
- checkExecutorTermination() ;
-
- _execService = Executors.newFixedThreadPool(_maxThreads) ;
-
- super.doStart() ;
- }
-
- /**
- * Execute on the thread.
- */
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("doRun() method of " + this.getClass().getSimpleName()
- + " started on thread " + Thread.currentThread().getName());
- }
-
- while (isRunning())
- {
- // Only pickup a message when a thread is available
- if (getActiveThreadCount() >= _maxThreads)
- {
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _pauseLapseInMillis) ;
- }
- else
- {
- waitForEventAndProcess(100) ;
- }
- }
- if (null != _execService)
- _execService.shutdown();
+ }
+ finally
+ {
+ _execService = null;
+ }
+ }
+ }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread " + Thread.currentThread().getName());
- }
- }
-
- public void waitForEventAndProcess (long maxWaitMillis)
- {
- Message message = null;
- try
- {
- message = (maxWaitMillis > 0) ? _pickUpCourier
- .pickup(maxWaitMillis) : null;
- }
- catch (CourierTimeoutException e)
- {
- return;
- }
- catch (CourierException e)
- {
- _logger.error("Courier Exception", e);
- return;
- }
+ // Child threads will send a -1 when their run() method ends
+ // we need to prevent picking up Messages when there are no available
+ // threads in pool
+ public void update(Observable o, Object arg)
+ {
+ if (arg instanceof Integer)
+ updateThreadCount((Integer) arg);
+ }
- if (null != message)
- {
- ActionProcessingPipeline chain = null;
+ private int getActiveThreadCount()
+ {
+ synchronized (_synchThreads)
+ {
+ return _qRunningThreads;
+ }
+ }
- try
- {
- chain = new ActionProcessingPipeline(message, _config);
- chain.addObserver(this);
- updateThreadCount(+1);
- _execService.execute(chain);
- }
- catch (Exception e)
- {
- _logger.error("ActionProcessingPipeline exception", e);
- return;
- }
- }
+ private void updateThreadCount(Integer i)
+ {
+ synchronized (_synchThreads)
+ {
+ _qRunningThreads += i.intValue();
+ }
+ }
- } // ________________________________
-
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- checkExecutorTermination() ;
-
- CourierUtil.cleanCourier(_pickUpCourier);
-
- RegistryUtil.unregister(_eprCategoryName, _eprName, _epr) ;
- }
-
- /**
- * Check that the existing executor has been closed down.
- * @throws ManagedLifecycleException If executor tasks are still active.
- */
- private void checkExecutorTermination()
- throws ManagedLifecycleException
- {
- if (_execService != null)
- {
- try
- {
- try
- {
- if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
- {
- throw new ManagedLifecycleException("Tasks remain active in executor") ;
- }
- }
- catch (final InterruptedException ie)
- {
- throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate") ;
- }
- }
- finally
- {
- _execService = null ;
- }
- }
- }
-
- // Child threads will send a -1 when their run() method ends
- // we need to prevent picking up Messages when there are no available
- // threads in pool
- public void update(Observable o, Object arg)
- {
- if (arg instanceof Integer)
- updateThreadCount((Integer) arg);
- }
+ private ConfigTree _config;
- private int getActiveThreadCount()
- {
- synchronized (_synchThreads)
- {
- return _qRunningThreads ;
- }
- }
+ private String _eprCategoryName;
- private void updateThreadCount(Integer i)
- {
- synchronized (_synchThreads)
- {
- _qRunningThreads += i.intValue();
- }
- }
+ private String _eprName;
- private ConfigTree _config;
+ private EPR _epr;
- private String _eprCategoryName;
+ private int _maxThreads;
- private String _eprName;
+ private int _defaultMaxThreads;
- private EPR _epr;
+ private long _latencySecs;
- private int _maxThreads;
+ private long _pauseLapseInMillis = 50;
- private int _defaultMaxThreads;
-
- private long _latencySecs;
-
- private long _pauseLapseInMillis = 50 ;
+ private ExecutorService _execService;
- private ExecutorService _execService;
+ private Object _synchThreads = new Short((short) -1);
- private Object _synchThreads = new Short((short) -1);
-
- private int _qRunningThreads;
+ private int _qRunningThreads;
- private Logger _logger = Logger.getLogger(MessageAwareListener.class);
-
- private PickUpOnlyCourier _pickUpCourier;
+ private Logger _logger = Logger.getLogger(MessageAwareListener.class);
+
+ private PickUpOnlyCourier _pickUpCourier;
}
More information about the jboss-svn-commits
mailing list