[jboss-svn-commits] JBL Code SVN: r10249 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Mar 15 22:10:28 EDT 2007


Author: bill.burke at jboss.com
Date: 2007-03-15 22:10:28 -0400 (Thu, 15 Mar 2007)
New Revision: 10249

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
formatting

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-03-16 01:59:04 UTC (rev 10248)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-03-16 02:10:28 UTC (rev 10249)
@@ -22,13 +22,6 @@
 
 package org.jboss.soa.esb.listeners.message;
 
-import java.lang.reflect.Method;
-import java.util.Observable;
-import java.util.Observer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.ConfigurationException;
@@ -50,325 +43,332 @@
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
 
+import java.lang.reflect.Method;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Esb Message aware transport independent listener. <p/> Relies on the
  * CourierFactory to obtain an appropriate Courier for the EPR this listener
  * will be listening on <br/>Keeps a thread pool to instantiate
  * ActionProcessingPipelines whenever a Message is received
- * 
+ *
  * @author <a
  *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
  * @since Version 4.0
  */
 
-public class MessageAwareListener  extends AbstractThreadedManagedLifecycle implements Observer
+public class MessageAwareListener extends AbstractThreadedManagedLifecycle implements Observer
 {
-        /**
-         * serial version uid for this class
-         */
-        private static final long serialVersionUID = -9198018611828254359L;
-    
-        /**
-	 * public constructor
-	 * 
-	 * @param config
-	 *            ConfigTree - Containing 'static' configuration for this
-	 *            instance
-	 * @throws ConfigurationException
-	 */
-	public MessageAwareListener(final ConfigTree config)
-			throws ConfigurationException
-	{
-            super(config);
-            _config = config ;
-            checkMyParms() ;
-	}
+   /**
+    * serial version uid for this class
+    */
+   private static final long serialVersionUID = -9198018611828254359L;
 
-	/**
-	 * Check for mandatory and optional attributes in parameter tree
-	 * 
-	 * @throws ConfigurationException -
-	 *             if mandatory atts are not right or actionClass not in
-	 *             classpath
-	 */
-	protected void checkMyParms () throws ConfigurationException
-	{
-                _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-                _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+   /**
+    * public constructor
+    *
+    * @param config ConfigTree - Containing 'static' configuration for this
+    *               instance
+    * @throws ConfigurationException
+    */
+   public MessageAwareListener(final ConfigTree config)
+           throws ConfigurationException
+   {
+      super(config);
+      _config = config;
+      checkMyParms();
+   }
 
-                final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG) ;
-                
-                if (!Util.isNullString(maxThreadVal))
-                {
-                    try
-                    {
-                        _maxThreads = Integer.parseInt(maxThreadVal) ;
-                    }
-                    catch (NumberFormatException nfe)
-                    {
-                        _maxThreads = _defaultMaxThreads ;
-                        _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">") ;
-                    }
-                }
+   /**
+    * Check for mandatory and optional attributes in parameter tree
+    *
+    * @throws ConfigurationException -
+    *                                if mandatory atts are not right or actionClass not in
+    *                                classpath
+    */
+   protected void checkMyParms() throws ConfigurationException
+   {
+      _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+      _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
 
-		if (Util.isNullString(_eprCategoryName))
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-		if (Util.isNullString(_eprName))
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+      final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
 
-		ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
-		if (null == eprElement)
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
-		_epr = ListenerUtil.assembleEpr(eprElement);
-                
-                String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
-                long lSeconds = 10;
-                if (null != latency)
-                {
-                    try
-                    {
-                        lSeconds = Integer.parseInt(latency);
-                    }
-                    catch (NumberFormatException e)
-                    {
-                        _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
-                    }
-                }
-                _latencySecs = lSeconds ;                
-	}
-        
-        /**
-         * Handle the initialisation of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while initialisation.
-         */
-        protected void doInitialise()
-            throws ManagedLifecycleException
-        {
-            final TwoWayCourier pickUpCourier ;
+      if (!Util.isNullString(maxThreadVal))
+      {
+         try
+         {
+            _maxThreads = Integer.parseInt(maxThreadVal);
+         }
+         catch (NumberFormatException nfe)
+         {
+            _maxThreads = _defaultMaxThreads;
+            _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">");
+         }
+      }
+
+      if (Util.isNullString(_eprCategoryName))
+         throw new ConfigurationException(
+                 "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+      if (Util.isNullString(_eprName))
+         throw new ConfigurationException(
+                 "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+
+      ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
+      if (null == eprElement)
+         throw new ConfigurationException(
+                 "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
+      _epr = ListenerUtil.assembleEpr(eprElement);
+
+      String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+      long lSeconds = 10;
+      if (null != latency)
+      {
+         try
+         {
+            lSeconds = Integer.parseInt(latency);
+         }
+         catch (NumberFormatException e)
+         {
+            _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
+         }
+      }
+      _latencySecs = lSeconds;
+   }
+
+   /**
+    * Handle the initialisation of the managed instance.
+    *
+    * @throws ManagedLifecycleException for errors while initialisation.
+    */
+   protected void doInitialise()
+           throws ManagedLifecycleException
+   {
+      final TwoWayCourier pickUpCourier;
+      try
+      {
+         pickUpCourier = CourierFactory.getPickupCourier(_epr);
+         try
+         {
+            final Method setPollLatency = pickUpCourier.getClass().getMethod(
+                    "setPollLatency", new Class[]{Long.class});
+            setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+         }
+         catch (final NoSuchMethodException nsme)
+         {
+            // OK, just leave it null
+         }
+         catch (final Exception ex)
+         {
+            CourierUtil.cleanCourier(pickUpCourier);
+            throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex);
+         }
+      }
+      catch (final MalformedEPRException mepre)
+      {
+         throw new ManagedLifecycleException("Malformed EPR: " + _epr);
+      }
+      catch (final CourierException ce)
+      {
+         throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+      }
+
+      _pickUpCourier = pickUpCourier;
+
+      try
+      {
+         RegistryUtil.register(_config, _epr);
+      }
+      catch (final RegistryException re)
+      {
+         CourierUtil.cleanCourier(_pickUpCourier);
+         throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+      }
+   }
+
+   /**
+    * Handle the start of the managed instance.
+    *
+    * @throws ManagedLifecycleException for errors while starting.
+    */
+   protected void doStart()
+           throws ManagedLifecycleException
+   {
+      checkExecutorTermination();
+
+      _execService = Executors.newFixedThreadPool(_maxThreads);
+
+      super.doStart();
+   }
+
+   /**
+    * Execute on the thread.
+    */
+   protected void doRun()
+   {
+      if (_logger.isDebugEnabled())
+      {
+         _logger.debug("doRun() method of " + this.getClass().getSimpleName()
+                 + " started on thread " + Thread.currentThread().getName());
+      }
+
+      while (isRunning())
+      {
+         // Only pickup a message when a thread is available
+         if (getActiveThreadCount() >= _maxThreads)
+         {
+            waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _pauseLapseInMillis);
+         }
+         else
+         {
+            waitForEventAndProcess(100);
+         }
+      }
+      if (null != _execService)
+         _execService.shutdown();
+
+      if (_logger.isDebugEnabled())
+      {
+         _logger.debug("run() method of " + this.getClass().getSimpleName()
+                 + " finished on thread " + Thread.currentThread().getName());
+      }
+   }
+
+   public void waitForEventAndProcess(long maxWaitMillis)
+   {
+      Message message = null;
+      try
+      {
+         message = (maxWaitMillis > 0) ? _pickUpCourier
+                 .pickup(maxWaitMillis) : null;
+      }
+      catch (CourierTimeoutException e)
+      {
+         return;
+      }
+      catch (CourierException e)
+      {
+         _logger.error("Courier Exception", e);
+         return;
+      }
+
+      if (null != message)
+      {
+         ActionProcessingPipeline chain = null;
+
+         try
+         {
+            chain = new ActionProcessingPipeline(message, _config);
+            chain.addObserver(this);
+            updateThreadCount(+1);
+            _execService.execute(chain);
+         }
+         catch (Exception e)
+         {
+            _logger.error("ActionProcessingPipeline exception", e);
+            return;
+         }
+      }
+
+   } // ________________________________
+
+   /**
+    * Handle the destroy of the managed instance.
+    *
+    * @throws ManagedLifecycleException for errors while destroying.
+    */
+   protected void doDestroy()
+           throws ManagedLifecycleException
+   {
+      checkExecutorTermination();
+
+      CourierUtil.cleanCourier(_pickUpCourier);
+
+      RegistryUtil.unregister(_eprCategoryName, _eprName, _epr);
+   }
+
+   /**
+    * Check that the existing executor has been closed down.
+    *
+    * @throws ManagedLifecycleException If executor tasks are still active.
+    */
+   private void checkExecutorTermination()
+           throws ManagedLifecycleException
+   {
+      if (_execService != null)
+      {
+         try
+         {
             try
             {
-                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
-                try
-                {
-                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
-                        "setPollLatency", new Class[] { Long.class });
-                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
-                }
-                catch (final NoSuchMethodException nsme)
-                {
-                        // OK, just leave it null
-                }
-                catch (final Exception ex)
-                {
-                    CourierUtil.cleanCourier(pickUpCourier);
-                    throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
-                }
+               if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
+               {
+                  throw new ManagedLifecycleException("Tasks remain active in executor");
+               }
             }
-            catch (final MalformedEPRException mepre)
+            catch (final InterruptedException ie)
             {
-                throw new ManagedLifecycleException("Malformed EPR: " + _epr) ;
+               throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate");
             }
-            catch (final CourierException ce)
-            {
-                throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
-            }
-            
-            _pickUpCourier = pickUpCourier ;
-         
-            try
-            {
-                RegistryUtil.register(_config, _epr);
-            }
-            catch (final RegistryException re)
-            {
-                CourierUtil.cleanCourier(_pickUpCourier);
-                throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
-            }
-        }
-        
-        /**
-         * Handle the start of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while starting.
-         */
-        protected void doStart()
-            throws ManagedLifecycleException
-        {
-            checkExecutorTermination() ;
-            
-            _execService = Executors.newFixedThreadPool(_maxThreads) ;
-            
-            super.doStart() ;
-        }
-        
-        /**
-         * Execute on the thread.
-         */
-        protected void doRun()
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("doRun() method of " + this.getClass().getSimpleName()
-                            + " started on thread " + Thread.currentThread().getName());
-            }
-            
-            while (isRunning())
-            {
-                // Only pickup a message when a thread is available
-                if (getActiveThreadCount() >= _maxThreads)
-                {
-                    waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _pauseLapseInMillis) ;
-                }
-                else
-                {
-                    waitForEventAndProcess(100) ;
-                }
-            }
-            if (null != _execService)
-                _execService.shutdown();
+         }
+         finally
+         {
+            _execService = null;
+         }
+      }
+   }
 
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("run() method of " + this.getClass().getSimpleName()
-                            + " finished on thread " + Thread.currentThread().getName());
-            }
-        }
-        
-	public void waitForEventAndProcess (long maxWaitMillis)
-	{
-		Message message = null;
-		try
-		{
-			message = (maxWaitMillis > 0) ? _pickUpCourier
-					.pickup(maxWaitMillis) : null;
-		}
-		catch (CourierTimeoutException e)
-		{
-			return;
-		}
-		catch (CourierException e)
-		{
-			_logger.error("Courier Exception", e);
-			return;
-		}
+   // Child threads will send a -1 when their run() method ends
+   // we need to prevent picking up Messages when there are no available
+   // threads in pool
+   public void update(Observable o, Object arg)
+   {
+      if (arg instanceof Integer)
+         updateThreadCount((Integer) arg);
+   }
 
-		if (null != message)
-		{
-			ActionProcessingPipeline chain = null;
+   private int getActiveThreadCount()
+   {
+      synchronized (_synchThreads)
+      {
+         return _qRunningThreads;
+      }
+   }
 
-			try
-			{
-				chain = new ActionProcessingPipeline(message, _config);
-				chain.addObserver(this);
-				updateThreadCount(+1);
-				_execService.execute(chain);
-			}
-			catch (Exception e)
-			{
-				_logger.error("ActionProcessingPipeline exception", e);
-				return;
-			}
-		}
+   private void updateThreadCount(Integer i)
+   {
+      synchronized (_synchThreads)
+      {
+         _qRunningThreads += i.intValue();
+      }
+   }
 
-	} // ________________________________
-        
-        /**
-         * Handle the destroy of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while destroying.
-         */
-        protected void doDestroy()
-            throws ManagedLifecycleException
-        {
-            checkExecutorTermination() ;
-            
-            CourierUtil.cleanCourier(_pickUpCourier);
-            
-            RegistryUtil.unregister(_eprCategoryName, _eprName, _epr) ;
-        }
-        
-        /**
-         * Check that the existing executor has been closed down.
-         * @throws ManagedLifecycleException If executor tasks are still active.
-         */
-        private void checkExecutorTermination()
-            throws ManagedLifecycleException
-        {
-            if (_execService != null)
-            {
-                try
-                {
-                    try
-                    {
-                        if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
-                        {
-                            throw new ManagedLifecycleException("Tasks remain active in executor") ;
-                        }
-                    }
-                    catch (final InterruptedException ie)
-                    {
-                        throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate") ;
-                    }
-                }
-                finally
-                {
-                    _execService = null ;
-                }
-            }
-        }
-        
-        // Child threads will send a -1 when their run() method ends
-        // we need to prevent picking up Messages when there are no available
-        // threads in pool
-        public void update(Observable o, Object arg)
-        {
-            if (arg instanceof Integer)
-                updateThreadCount((Integer) arg);
-        }
+   private ConfigTree _config;
 
-        private int getActiveThreadCount()
-        {
-            synchronized (_synchThreads)
-            {
-                return _qRunningThreads ;
-            }
-        }
+   private String _eprCategoryName;
 
-        private void updateThreadCount(Integer i)
-        {
-            synchronized (_synchThreads)
-            {
-                _qRunningThreads += i.intValue();
-            }
-        }
+   private String _eprName;
 
-        private ConfigTree _config;
+   private EPR _epr;
 
-        private String _eprCategoryName;
+   private int _maxThreads;
 
-        private String _eprName;
+   private int _defaultMaxThreads;
 
-        private EPR _epr;
+   private long _latencySecs;
 
-        private int _maxThreads;
+   private long _pauseLapseInMillis = 50;
 
-        private int _defaultMaxThreads;
-        
-        private long _latencySecs;
-        
-        private long _pauseLapseInMillis = 50 ;
+   private ExecutorService _execService;
 
-        private ExecutorService _execService;
+   private Object _synchThreads = new Short((short) -1);
 
-        private Object _synchThreads = new Short((short) -1);
-        
-        private int _qRunningThreads;
+   private int _qRunningThreads;
 
-        private Logger _logger = Logger.getLogger(MessageAwareListener.class);
-        
-        private PickUpOnlyCourier _pickUpCourier;
+   private Logger _logger = Logger.getLogger(MessageAwareListener.class);
+
+   private PickUpOnlyCourier _pickUpCourier;
 }




More information about the jboss-svn-commits mailing list