[jboss-svn-commits] JBL Code SVN: r15836 - labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Oct 15 07:15:55 EDT 2007


Author: tfennelly
Date: 2007-10-15 07:15:55 -0400 (Mon, 15 Oct 2007)
New Revision: 15836

Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Formatting only - to make it more readable.

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-10-15 10:45:00 UTC (rev 15835)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-10-15 11:15:55 UTC (rev 15836)
@@ -22,21 +22,12 @@
 
 package org.jboss.soa.esb.listeners.message;
 
-import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierTimeoutException;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.couriers.*;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -48,368 +39,324 @@
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
 
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Esb Message aware transport independent listener. <p/> Relies on the
  * CourierFactory to obtain an appropriate Courier for the EPR this listener
  * will be listening on <br/>Keeps a thread pool to instantiate
  * ActionProcessingPipelines whenever a Message is received
- * 
+ *
  * @author <a
  *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
  * @since Version 4.0
  */
 
-public class MessageAwareListener  extends AbstractThreadedManagedLifecycle
-{
-        /**
-         * serial version uid for this class
-         */
-        private static final long serialVersionUID = -9198018611828254359L;
-        
-        /**
-         * The minimum error delay.
-         */
-        private static final long MIN_ERROR_DELAY = 1000 ;
-        /**
-         * The maximum error delay.
-         */
-        private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5) ;
-        
-        /**
-         * The action pipeline.
-         */
-        private ActionProcessingPipeline pipeline ;
-    
-        /**
-         * The error delay.
-         */
-        private long errorDelay ;
-        
-        /**
-	 * public constructor
-	 * 
-	 * @param config
-	 *            ConfigTree - Containing 'static' configuration for this
-	 *            instance
-	 * @throws ConfigurationException
-	 */
-	public MessageAwareListener(final ConfigTree config)
-			throws ConfigurationException
-	{
-            super(config);
-            _config = config ;
-            checkMyParms() ;
-	}
+public class MessageAwareListener extends AbstractThreadedManagedLifecycle {
+    /**
+     * serial version uid for this class
+     */
+    private static final long serialVersionUID = -9198018611828254359L;
 
-	/**
-	 * Check for mandatory and optional attributes in parameter tree
-	 * 
-	 * @throws ConfigurationException -
-	 *             if mandatory atts are not right or actionClass not in
-	 *             classpath
-	 */
-	protected void checkMyParms () throws ConfigurationException
-	{
-                _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-                _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+    /**
+     * The minimum error delay.
+     */
+    private static final long MIN_ERROR_DELAY = 1000;
+    /**
+     * The maximum error delay.
+     */
+    private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5);
 
-                final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG) ;
-                
-                if (!Util.isNullString(maxThreadVal))
-                {
-                    try
-                    {
-                        _maxThreads = Integer.parseInt(maxThreadVal) ;
-                    }
-                    catch (NumberFormatException nfe)
-                    {
-                        _maxThreads = _defaultMaxThreads ;
-                        _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">") ;
-                    }
-                }
+    /**
+     * The action pipeline.
+     */
+    private ActionProcessingPipeline pipeline;
 
-		if (Util.isNullString(_eprCategoryName))
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-		if (Util.isNullString(_eprName))
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+    /**
+     * The error delay.
+     */
+    private long errorDelay;
 
-		ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
-		if (null == eprElement)
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
-		_epr = ListenerUtil.assembleEpr(eprElement);
-                
-                String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
-                long lSeconds = 10;
-                if (null != latency)
-                {
-                    try
-                    {
-                        lSeconds = Integer.parseInt(latency);
-                    }
-                    catch (NumberFormatException e)
-                    {
-                        _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
-                    }
-                }
-                _latencySecs = lSeconds ;                
-	}
-        
-        /**
-         * Handle the initialisation of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while initialisation.
-         */
-        protected void doInitialise()
-            throws ManagedLifecycleException
-        {
-            final ActionProcessingPipeline pipeline ;
-            try
-            {
-                pipeline = new ActionProcessingPipeline(_config) ;
-                pipeline.initialise() ;
+    /**
+     * public constructor
+     *
+     * @param config ConfigTree - Containing 'static' configuration for this
+     *               instance
+     * @throws ConfigurationException
+     */
+    public MessageAwareListener(final ConfigTree config)
+            throws ConfigurationException {
+        super(config);
+        _config = config;
+        checkMyParms();
+    }
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     *
+     * @throws ConfigurationException -
+     *                                if mandatory atts are not right or actionClass not in
+     *                                classpath
+     */
+    protected void checkMyParms() throws ConfigurationException {
+        _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+        _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+
+        final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
+
+        if (!Util.isNullString(maxThreadVal)) {
+            try {
+                _maxThreads = Integer.parseInt(maxThreadVal);
             }
-            catch (final ConfigurationException ce)
-            {
-                throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
+            catch (NumberFormatException nfe) {
+                _maxThreads = _defaultMaxThreads;
+                _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">");
             }
-            this.pipeline = pipeline ;
-            final TwoWayCourier pickUpCourier ;
-            try
-            {
-                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
-                try
-                {
-                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
-                        "setPollLatency", new Class[] { Long.class });
-                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
-                }
-                catch (final NoSuchMethodException nsme)
-                {
-                        // OK, just leave it null
-                }
-                catch (final Exception ex)
-                {
-                    CourierUtil.cleanCourier(pickUpCourier);
-                    throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
-                }
+        }
+
+        if (Util.isNullString(_eprCategoryName))
+            throw new ConfigurationException(
+                    "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+        if (Util.isNullString(_eprName))
+            throw new ConfigurationException(
+                    "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+
+        ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
+        if (null == eprElement)
+            throw new ConfigurationException(
+                    "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
+        _epr = ListenerUtil.assembleEpr(eprElement);
+
+        String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+        long lSeconds = 10;
+        if (null != latency) {
+            try {
+                lSeconds = Integer.parseInt(latency);
             }
-            catch (final MalformedEPRException mepre)
-            {
-                throw new ManagedLifecycleException("Malformed EPR: " + _epr) ;
+            catch (NumberFormatException e) {
+                _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
             }
-            catch (final CourierException ce)
-            {
-                throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+        }
+        _latencySecs = lSeconds;
+    }
+
+    /**
+     * Handle the initialisation of the managed instance.
+     *
+     * @throws ManagedLifecycleException for errors while initialisation.
+     */
+    protected void doInitialise()
+            throws ManagedLifecycleException {
+        final ActionProcessingPipeline pipeline;
+        try {
+            pipeline = new ActionProcessingPipeline(_config);
+            pipeline.initialise();
+        }
+        catch (final ConfigurationException ce) {
+            throw new ManagedLifecycleException("Error configuring action processing pipeline", ce);
+        }
+        this.pipeline = pipeline;
+        final TwoWayCourier pickUpCourier;
+        try {
+            pickUpCourier = CourierFactory.getPickupCourier(_epr);
+            try {
+                final Method setPollLatency = pickUpCourier.getClass().getMethod(
+                        "setPollLatency", new Class[]{Long.class});
+                setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
             }
-            
-            _pickUpCourier = pickUpCourier ;
-         
-            try
-            {
-                RegistryUtil.register(_config, _epr);
+            catch (final NoSuchMethodException nsme) {
+                // OK, just leave it null
             }
-            catch (final RegistryException re)
-            {
-                CourierUtil.cleanCourier(_pickUpCourier);
-                throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+            catch (final Exception ex) {
+                CourierUtil.cleanCourier(pickUpCourier);
+                throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex);
             }
         }
-        
-        /**
-         * Handle the start of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while starting.
-         */
-        protected void doStart()
-            throws ManagedLifecycleException
-        {
-            checkExecutorTermination() ;
-            
-            _execService = Executors.newFixedThreadPool(_maxThreads) ;
-            
-            super.doStart() ;
+        catch (final MalformedEPRException mepre) {
+            throw new ManagedLifecycleException("Malformed EPR: " + _epr);
         }
-        
-        /**
-         * Execute on the thread.
-         */
-        protected void doRun()
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("doRun() method of " + this.getClass().getSimpleName()
-                            + " started on thread " + Thread.currentThread().getName());
-            }
-            
-            while (isRunning())
-            {
-                // Only pickup a message when a thread is available
-                if (waitForThread(_pauseLapseInMillis))
-                {
-                    waitForEventAndProcess(100) ;
-                }
-            }
+        catch (final CourierException ce) {
+            throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+        }
 
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("run() method of " + this.getClass().getSimpleName()
-                            + " finished on thread " + Thread.currentThread().getName());
+        _pickUpCourier = pickUpCourier;
+
+        try {
+            RegistryUtil.register(_config, _epr);
+        }
+        catch (final RegistryException re) {
+            CourierUtil.cleanCourier(_pickUpCourier);
+            throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+        }
+    }
+
+    /**
+     * Handle the start of the managed instance.
+     *
+     * @throws ManagedLifecycleException for errors while starting.
+     */
+    protected void doStart()
+            throws ManagedLifecycleException {
+        checkExecutorTermination();
+
+        _execService = Executors.newFixedThreadPool(_maxThreads);
+
+        super.doStart();
+    }
+
+    /**
+     * Execute on the thread.
+     */
+    protected void doRun() {
+        if (_logger.isDebugEnabled()) {
+            _logger.debug("doRun() method of " + this.getClass().getSimpleName()
+                    + " started on thread " + Thread.currentThread().getName());
+        }
+
+        while (isRunning()) {
+            // Only pickup a message when a thread is available
+            if (waitForThread(_pauseLapseInMillis)) {
+                waitForEventAndProcess(100);
             }
         }
-        
-	public void waitForEventAndProcess (long maxWaitMillis)
-	{
-		final Message message ;
-		try
-		{
-			message = (maxWaitMillis > 0) ? _pickUpCourier
-					.pickup(maxWaitMillis) : null;
-                        errorDelay = 0 ;
-		}
-		catch (CourierTimeoutException e)
-		{
-			return;
-		}
-		catch (CourierException e)
-		{
-                        _logger.debug("Courier Exception", e);
-                        if (errorDelay == 0)
-                        {
-                            errorDelay = MIN_ERROR_DELAY ;
-                        }
-                        else if (errorDelay < MAX_ERROR_DELAY)
-                        {
-                            errorDelay <<= 1 ;
-                        }
-                        _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
-                        waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
-			return;
-		}
 
-		if (null != message)
-		{
-                    final Runnable pipelineRunner = new Runnable() {
-                        public void run() {
-                            try {
-                                pipeline.process(message) ;
-                            } finally {
-                                updateThreadCount(-1) ;
-                            }
-                        }
-                    } ;
-                    updateThreadCount(+1);
-                    _execService.execute(pipelineRunner);
-		}
+        if (_logger.isDebugEnabled()) {
+            _logger.debug("run() method of " + this.getClass().getSimpleName()
+                    + " finished on thread " + Thread.currentThread().getName());
+        }
+    }
 
-	} // ________________________________
-        
-        /**
-         * Handle the threaded destroy of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while destroying.
-         */
-        protected void doThreadedDestroy()
-            throws ManagedLifecycleException
-        {
-            if (_execService != null)
-            {
-                _execService.shutdown() ;
-                checkExecutorTermination() ;
+    public void waitForEventAndProcess(long maxWaitMillis) {
+        final Message message;
+        try {
+            message = (maxWaitMillis > 0) ? _pickUpCourier
+                    .pickup(maxWaitMillis) : null;
+            errorDelay = 0;
+        }
+        catch (CourierTimeoutException e) {
+            return;
+        }
+        catch (CourierException e) {
+            _logger.debug("Courier Exception", e);
+            if (errorDelay == 0) {
+                errorDelay = MIN_ERROR_DELAY;
+            } else if (errorDelay < MAX_ERROR_DELAY) {
+                errorDelay <<= 1;
             }
-            
-            pipeline.destroy() ;
-            pipeline = null ;
-            
-            CourierUtil.cleanCourier(_pickUpCourier);
-            RegistryUtil.unregister(_eprCategoryName, _eprName, _epr) ;
+            _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds");
+            waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay);
+            return;
         }
-        
-        /**
-         * Check that the existing executor has been closed down.
-         * @throws ManagedLifecycleException If executor tasks are still active.
-         */
-        private void checkExecutorTermination()
-            throws ManagedLifecycleException
-        {
-            if (_execService != null)
-            {
-                try
-                {
-                    try
-                    {
-                        if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
-                        {
-                            throw new ManagedLifecycleException("Tasks remain active in executor") ;
-                        }
+
+        if (null != message) {
+            final Runnable pipelineRunner = new Runnable() {
+                public void run() {
+                    try {
+                        pipeline.process(message);
+                    } finally {
+                        updateThreadCount(-1);
                     }
-                    catch (final InterruptedException ie)
-                    {
-                        throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate") ;
+                }
+            };
+            updateThreadCount(+1);
+            _execService.execute(pipelineRunner);
+        }
+
+    } // ________________________________
+
+    /**
+     * Handle the threaded destroy of the managed instance.
+     *
+     * @throws ManagedLifecycleException for errors while destroying.
+     */
+    protected void doThreadedDestroy()
+            throws ManagedLifecycleException {
+        if (_execService != null) {
+            _execService.shutdown();
+            checkExecutorTermination();
+        }
+
+        pipeline.destroy();
+        pipeline = null;
+
+        CourierUtil.cleanCourier(_pickUpCourier);
+        RegistryUtil.unregister(_eprCategoryName, _eprName, _epr);
+    }
+
+    /**
+     * Check that the existing executor has been closed down.
+     *
+     * @throws ManagedLifecycleException If executor tasks are still active.
+     */
+    private void checkExecutorTermination()
+            throws ManagedLifecycleException {
+        if (_execService != null) {
+            try {
+                try {
+                    if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS)) {
+                        throw new ManagedLifecycleException("Tasks remain active in executor");
                     }
                 }
-                finally
-                {
-                    _execService = null ;
+                catch (final InterruptedException ie) {
+                    throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate");
                 }
             }
+            finally {
+                _execService = null;
+            }
         }
+    }
 
-        private boolean waitForThread(final long delay)
-        {
-            boolean result = true ;
-            synchronized(_synchThreads)
-            {
-                if (_qRunningThreads >= _maxThreads)
-                {
-                    try
-                    {
-                        _synchThreads.wait(delay) ;
-                    }
-                    catch (final InterruptedException ie) {}
-                    result = _qRunningThreads < _maxThreads ;
+    private boolean waitForThread(final long delay) {
+        boolean result = true;
+        synchronized (_synchThreads) {
+            if (_qRunningThreads >= _maxThreads) {
+                try {
+                    _synchThreads.wait(delay);
                 }
+                catch (final InterruptedException ie) {
+                }
+                result = _qRunningThreads < _maxThreads;
             }
-            return result ;
         }
+        return result;
+    }
 
-        private void updateThreadCount(Integer i)
-        {
-            synchronized (_synchThreads)
-            {
-                _qRunningThreads += i.intValue();
-                if (_qRunningThreads < _maxThreads)
-                {
-                    _synchThreads.notifyAll() ;
-                }
+    private void updateThreadCount(Integer i) {
+        synchronized (_synchThreads) {
+            _qRunningThreads += i.intValue();
+            if (_qRunningThreads < _maxThreads) {
+                _synchThreads.notifyAll();
             }
         }
+    }
 
-        private ConfigTree _config;
+    private ConfigTree _config;
 
-        private String _eprCategoryName;
+    private String _eprCategoryName;
 
-        private String _eprName;
+    private String _eprName;
 
-        private EPR _epr;
+    private EPR _epr;
 
-        private int _maxThreads;
+    private int _maxThreads;
 
-        private int _defaultMaxThreads = 1;
-        
-        private long _latencySecs;
-        
-        private long _pauseLapseInMillis = 50 ;
+    private int _defaultMaxThreads = 1;
 
-        private ExecutorService _execService;
+    private long _latencySecs;
 
-        private Object _synchThreads = new Short((short) -1);
-        
-        private int _qRunningThreads;
+    private long _pauseLapseInMillis = 50;
 
-        private Logger _logger = Logger.getLogger(MessageAwareListener.class);
-        
-        private PickUpOnlyCourier _pickUpCourier;
+    private ExecutorService _execService;
+
+    private Object _synchThreads = new Short((short) -1);
+
+    private int _qRunningThreads;
+
+    private Logger _logger = Logger.getLogger(MessageAwareListener.class);
+
+    private PickUpOnlyCourier _pickUpCourier;
 }




More information about the jboss-svn-commits mailing list