[jboss-svn-commits] JBL Code SVN: r11104 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Apr 18 18:31:00 EDT 2007


Author: bill.burke at jboss.com
Date: 2007-04-18 18:31:00 -0400 (Wed, 18 Apr 2007)
New Revision: 11104

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
formatting

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-04-18 20:07:44 UTC (rev 11103)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2007-04-18 22:31:00 UTC (rev 11104)
@@ -53,369 +53,376 @@
  * CourierFactory to obtain an appropriate Courier for the EPR this listener
  * will be listening on <br/>Keeps a thread pool to instantiate
  * ActionProcessingPipelines whenever a Message is received
- * 
+ *
  * @author <a
  *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
  * @since Version 4.0
  */
 
-public class MessageAwareListener  extends AbstractThreadedManagedLifecycle
+public class MessageAwareListener extends AbstractThreadedManagedLifecycle
 {
-        /**
-         * serial version uid for this class
-         */
-        private static final long serialVersionUID = -9198018611828254359L;
-        
-        /**
-         * The minimum error delay.
-         */
-        private static final long MIN_ERROR_DELAY = 1000 ;
-        /**
-         * The maximum error delay.
-         */
-        private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5) ;
-        
-        /**
-         * The action pipeline.
-         */
-        private ActionProcessingPipeline pipeline ;
-    
-        /**
-         * The error delay.
-         */
-        private long errorDelay ;
-        
-        /**
-	 * public constructor
-	 * 
-	 * @param config
-	 *            ConfigTree - Containing 'static' configuration for this
-	 *            instance
-	 * @throws ConfigurationException
-	 */
-	public MessageAwareListener(final ConfigTree config)
-			throws ConfigurationException
-	{
-            super(config);
-            _config = config ;
-            checkMyParms() ;
-	}
+   /**
+    * serial version uid for this class
+    */
+   private static final long serialVersionUID = -9198018611828254359L;
 
-	/**
-	 * Check for mandatory and optional attributes in parameter tree
-	 * 
-	 * @throws ConfigurationException -
-	 *             if mandatory atts are not right or actionClass not in
-	 *             classpath
-	 */
-	protected void checkMyParms () throws ConfigurationException
-	{
-                _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-                _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+   /**
+    * The minimum error delay.
+    */
+   private static final long MIN_ERROR_DELAY = 1000;
+   /**
+    * The maximum error delay.
+    */
+   private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5);
 
-                final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG) ;
-                
-                if (!Util.isNullString(maxThreadVal))
-                {
-                    try
-                    {
-                        _maxThreads = Integer.parseInt(maxThreadVal) ;
-                    }
-                    catch (NumberFormatException nfe)
-                    {
-                        _maxThreads = _defaultMaxThreads ;
-                        _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">") ;
-                    }
-                }
+   /**
+    * The action pipeline.
+    */
+   private ActionProcessingPipeline pipeline;
 
-		if (Util.isNullString(_eprCategoryName))
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-		if (Util.isNullString(_eprName))
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+   /**
+    * The error delay.
+    */
+   private long errorDelay;
 
-		ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
-		if (null == eprElement)
-			throw new ConfigurationException(
-					"Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
-		_epr = ListenerUtil.assembleEpr(eprElement);
-                
-                String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
-                long lSeconds = 10;
-                if (null != latency)
-                {
-                    try
-                    {
-                        lSeconds = Integer.parseInt(latency);
-                    }
-                    catch (NumberFormatException e)
-                    {
-                        _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
-                    }
-                }
-                _latencySecs = lSeconds ;                
-	}
-        
-        /**
-         * Handle the initialisation of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while initialisation.
-         */
-        protected void doInitialise()
-            throws ManagedLifecycleException
-        {
-            final ActionProcessingPipeline pipeline ;
-            try
+   /**
+    * public constructor
+    *
+    * @param config ConfigTree - Containing 'static' configuration for this
+    *               instance
+    * @throws ConfigurationException
+    */
+   public MessageAwareListener(final ConfigTree config)
+           throws ConfigurationException
+   {
+      super(config);
+      _config = config;
+      checkMyParms();
+   }
+
+   /**
+    * Check for mandatory and optional attributes in parameter tree
+    *
+    * @throws ConfigurationException -
+    *                                if mandatory atts are not right or actionClass not in
+    *                                classpath
+    */
+   protected void checkMyParms() throws ConfigurationException
+   {
+      _eprCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+      _eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+
+      final String maxThreadVal = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
+
+      if (!Util.isNullString(maxThreadVal))
+      {
+         try
+         {
+            _maxThreads = Integer.parseInt(maxThreadVal);
+         }
+         catch (NumberFormatException nfe)
+         {
+            _maxThreads = _defaultMaxThreads;
+            _logger.warn("Invalid " + ListenerTagNames.MAX_THREADS_TAG + " attribute, defaulting to <" + _maxThreads + ">");
+         }
+      }
+
+      if (Util.isNullString(_eprCategoryName))
+         throw new ConfigurationException(
+                 "Missing or invalid " + ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+      if (Util.isNullString(_eprName))
+         throw new ConfigurationException(
+                 "Missing or invalid " + ListenerTagNames.SERVICE_NAME_TAG);
+
+      ConfigTree eprElement = _config.getFirstChild(ListenerTagNames.EPR_TAG);
+      if (null == eprElement)
+         throw new ConfigurationException(
+                 "Missing or invalid " + ListenerTagNames.EPR_TAG + " element");
+      _epr = ListenerUtil.assembleEpr(eprElement);
+
+      String latency = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+      long lSeconds = 10;
+      if (null != latency)
+      {
+         try
+         {
+            lSeconds = Integer.parseInt(latency);
+         }
+         catch (NumberFormatException e)
+         {
+            _logger.warn("Invalid number format <" + latency + "> using default value (" + lSeconds + ")");
+         }
+      }
+      _latencySecs = lSeconds;
+   }
+
+   /**
+    * Handle the initialisation of the managed instance.
+    *
+    * @throws ManagedLifecycleException for errors while initialisation.
+    */
+   protected void doInitialise()
+           throws ManagedLifecycleException
+   {
+      final ActionProcessingPipeline pipeline;
+      try
+      {
+         pipeline = new ActionProcessingPipeline(_config);
+         pipeline.initialise();
+      }
+      catch (final ConfigurationException ce)
+      {
+         throw new ManagedLifecycleException("Error configuring action processing pipeline", ce);
+      }
+      this.pipeline = pipeline;
+      final TwoWayCourier pickUpCourier;
+      try
+      {
+         pickUpCourier = CourierFactory.getPickupCourier(_epr);
+         try
+         {
+            final Method setPollLatency = pickUpCourier.getClass().getMethod(
+                    "setPollLatency", new Class[]{Long.class});
+            setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
+         }
+         catch (final NoSuchMethodException nsme)
+         {
+            // OK, just leave it null
+         }
+         catch (final Exception ex)
+         {
+            CourierUtil.cleanCourier(pickUpCourier);
+            throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex);
+         }
+      }
+      catch (final MalformedEPRException mepre)
+      {
+         throw new ManagedLifecycleException("Malformed EPR: " + _epr);
+      }
+      catch (final CourierException ce)
+      {
+         throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
+      }
+
+      _pickUpCourier = pickUpCourier;
+
+      try
+      {
+         RegistryUtil.register(_config, _epr);
+      }
+      catch (final RegistryException re)
+      {
+         CourierUtil.cleanCourier(_pickUpCourier);
+         throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
+      }
+   }
+
+   /**
+    * Handle the start of the managed instance.
+    *
+    * @throws ManagedLifecycleException for errors while starting.
+    */
+   protected void doStart()
+           throws ManagedLifecycleException
+   {
+      checkExecutorTermination();
+
+      _execService = Executors.newFixedThreadPool(_maxThreads);
+
+      super.doStart();
+   }
+
+   /**
+    * Execute on the thread.
+    */
+   protected void doRun()
+   {
+      if (_logger.isDebugEnabled())
+      {
+         _logger.debug("doRun() method of " + this.getClass().getSimpleName()
+                 + " started on thread " + Thread.currentThread().getName());
+      }
+
+      while (isRunning())
+      {
+         // Only pickup a message when a thread is available
+         if (waitForThread(_pauseLapseInMillis))
+         {
+            waitForEventAndProcess(100);
+         }
+      }
+      if (null != _execService)
+         _execService.shutdown();
+
+      if (_logger.isDebugEnabled())
+      {
+         _logger.debug("run() method of " + this.getClass().getSimpleName()
+                 + " finished on thread " + Thread.currentThread().getName());
+      }
+   }
+
+   public void waitForEventAndProcess(long maxWaitMillis)
+   {
+      final Message message;
+      try
+      {
+         message = (maxWaitMillis > 0) ? _pickUpCourier
+                 .pickup(maxWaitMillis) : null;
+         errorDelay = 0;
+      }
+      catch (CourierTimeoutException e)
+      {
+         return;
+      }
+      catch (CourierException e)
+      {
+         _logger.error("Courier Exception", e);
+         if (errorDelay == 0)
+         {
+            errorDelay = MIN_ERROR_DELAY;
+         }
+         else if (errorDelay < MAX_ERROR_DELAY)
+         {
+            errorDelay <<= 1;
+         }
+         waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay);
+         return;
+      }
+
+      if (null != message)
+      {
+         try
+         {
+            final Runnable pipelineRunner = new Runnable()
             {
-                pipeline = new ActionProcessingPipeline(_config) ;
-                pipeline.initialise() ;
-            }
-            catch (final ConfigurationException ce)
-            {
-                throw new ManagedLifecycleException("Error configuring action processing pipeline", ce) ;
-            }
-            this.pipeline = pipeline ;
-            final TwoWayCourier pickUpCourier ;
+               public void run()
+               {
+                  try
+                  {
+                     pipeline.process(message);
+                  }
+                  finally
+                  {
+                     updateThreadCount(-1);
+                  }
+               }
+            };
+            updateThreadCount(+1);
+            _execService.execute(pipelineRunner);
+         }
+         catch (Exception e)
+         {
+            _logger.error("ActionProcessingPipeline exception", e);
+            return;
+         }
+      }
+
+   } // ________________________________
+
+   /**
+    * Handle the destroy of the managed instance.
+    *
+    * @throws ManagedLifecycleException for errors while destroying.
+    */
+   protected void doDestroy()
+           throws ManagedLifecycleException
+   {
+      checkExecutorTermination();
+
+      pipeline.destroy();
+      pipeline = null;
+
+      CourierUtil.cleanCourier(_pickUpCourier);
+
+      RegistryUtil.unregister(_eprCategoryName, _eprName, _epr);
+   }
+
+   /**
+    * Check that the existing executor has been closed down.
+    *
+    * @throws ManagedLifecycleException If executor tasks are still active.
+    */
+   private void checkExecutorTermination()
+           throws ManagedLifecycleException
+   {
+      if (_execService != null)
+      {
+         try
+         {
             try
             {
-                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
-                try
-                {
-                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
-                        "setPollLatency", new Class[] { Long.class });
-                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
-                }
-                catch (final NoSuchMethodException nsme)
-                {
-                        // OK, just leave it null
-                }
-                catch (final Exception ex)
-                {
-                    CourierUtil.cleanCourier(pickUpCourier);
-                    throw new ManagedLifecycleException("Problems invoking setPollLatency(long)", ex) ;
-                }
+               if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
+               {
+                  throw new ManagedLifecycleException("Tasks remain active in executor");
+               }
             }
-            catch (final MalformedEPRException mepre)
+            catch (final InterruptedException ie)
             {
-                throw new ManagedLifecycleException("Malformed EPR: " + _epr) ;
+               throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate");
             }
-            catch (final CourierException ce)
-            {
-                throw new ManagedLifecycleException("No appropriate courier can be obtained for " + _epr, ce);
-            }
-            
-            _pickUpCourier = pickUpCourier ;
-         
+         }
+         finally
+         {
+            _execService = null;
+         }
+      }
+   }
+
+   private boolean waitForThread(final long delay)
+   {
+      boolean result = true;
+      synchronized (_synchThreads)
+      {
+         if (_qRunningThreads >= _maxThreads)
+         {
             try
             {
-                RegistryUtil.register(_config, _epr);
+               _synchThreads.wait(delay);
             }
-            catch (final RegistryException re)
+            catch (final InterruptedException ie)
             {
-                CourierUtil.cleanCourier(_pickUpCourier);
-                throw new ManagedLifecycleException("Unexpected error during registration for epr " + _epr, re);
             }
-        }
-        
-        /**
-         * Handle the start of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while starting.
-         */
-        protected void doStart()
-            throws ManagedLifecycleException
-        {
-            checkExecutorTermination() ;
-            
-            _execService = Executors.newFixedThreadPool(_maxThreads) ;
-            
-            super.doStart() ;
-        }
-        
-        /**
-         * Execute on the thread.
-         */
-        protected void doRun()
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("doRun() method of " + this.getClass().getSimpleName()
-                            + " started on thread " + Thread.currentThread().getName());
-            }
-            
-            while (isRunning())
-            {
-                // Only pickup a message when a thread is available
-                if (waitForThread(_pauseLapseInMillis))
-                {
-                    waitForEventAndProcess(100) ;
-                }
-            }
-            if (null != _execService)
-                _execService.shutdown();
+            result = _qRunningThreads < _maxThreads;
+         }
+      }
+      return result;
+   }
 
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("run() method of " + this.getClass().getSimpleName()
-                            + " finished on thread " + Thread.currentThread().getName());
-            }
-        }
-        
-	public void waitForEventAndProcess (long maxWaitMillis)
-	{
-		final Message message ;
-		try
-		{
-			message = (maxWaitMillis > 0) ? _pickUpCourier
-					.pickup(maxWaitMillis) : null;
-                        errorDelay = 0 ;
-		}
-		catch (CourierTimeoutException e)
-		{
-			return;
-		}
-		catch (CourierException e)
-		{
-			_logger.error("Courier Exception", e);
-                        if (errorDelay == 0)
-                        {
-                            errorDelay = MIN_ERROR_DELAY ;
-                        }
-                        else if (errorDelay < MAX_ERROR_DELAY)
-                        {
-                            errorDelay <<= 1 ;
-                        }
-                        waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
-			return;
-		}
+   private void updateThreadCount(Integer i)
+   {
+      synchronized (_synchThreads)
+      {
+         _qRunningThreads += i.intValue();
+         if (_qRunningThreads < _maxThreads)
+         {
+            _synchThreads.notifyAll();
+         }
+      }
+   }
 
-		if (null != message)
-		{
-			try
-			{
-                            final Runnable pipelineRunner = new Runnable() {
-                                public void run() {
-                                    try {
-                                        pipeline.process(message) ;
-                                    } finally {
-                                        updateThreadCount(-1) ;
-                                    }
-                                }
-                            } ;
-                            updateThreadCount(+1);
-                            _execService.execute(pipelineRunner);
-			}
-			catch (Exception e)
-			{
-				_logger.error("ActionProcessingPipeline exception", e);
-				return;
-			}
-		}
+   private ConfigTree _config;
 
-	} // ________________________________
-        
-        /**
-         * Handle the destroy of the managed instance.
-         * 
-         * @throws ManagedLifecycleException for errors while destroying.
-         */
-        protected void doDestroy()
-            throws ManagedLifecycleException
-        {
-            checkExecutorTermination() ;
-            
-            pipeline.destroy() ;
-            pipeline = null ;
-            
-            CourierUtil.cleanCourier(_pickUpCourier);
-            
-            RegistryUtil.unregister(_eprCategoryName, _eprName, _epr) ;
-        }
-        
-        /**
-         * Check that the existing executor has been closed down.
-         * @throws ManagedLifecycleException If executor tasks are still active.
-         */
-        private void checkExecutorTermination()
-            throws ManagedLifecycleException
-        {
-            if (_execService != null)
-            {
-                try
-                {
-                    try
-                    {
-                        if (!_execService.awaitTermination(getTerminationPeriod(), TimeUnit.MILLISECONDS))
-                        {
-                            throw new ManagedLifecycleException("Tasks remain active in executor") ;
-                        }
-                    }
-                    catch (final InterruptedException ie)
-                    {
-                        throw new ManagedLifecycleException("Interrupted waiting for active tasks to terminate") ;
-                    }
-                }
-                finally
-                {
-                    _execService = null ;
-                }
-            }
-        }
+   private String _eprCategoryName;
 
-        private boolean waitForThread(final long delay)
-        {
-            boolean result = true ;
-            synchronized(_synchThreads)
-            {
-                if (_qRunningThreads >= _maxThreads)
-                {
-                    try
-                    {
-                        _synchThreads.wait(delay) ;
-                    }
-                    catch (final InterruptedException ie) {}
-                    result = _qRunningThreads < _maxThreads ;
-                }
-            }
-            return result ;
-        }
+   private String _eprName;
 
-        private void updateThreadCount(Integer i)
-        {
-            synchronized (_synchThreads)
-            {
-                _qRunningThreads += i.intValue();
-                if (_qRunningThreads < _maxThreads)
-                {
-                    _synchThreads.notifyAll() ;
-                }
-            }
-        }
+   private EPR _epr;
 
-        private ConfigTree _config;
+   private int _maxThreads;
 
-        private String _eprCategoryName;
+   private int _defaultMaxThreads = 1;
 
-        private String _eprName;
+   private long _latencySecs;
 
-        private EPR _epr;
+   private long _pauseLapseInMillis = 50;
 
-        private int _maxThreads;
+   private ExecutorService _execService;
 
-        private int _defaultMaxThreads = 1;
-        
-        private long _latencySecs;
-        
-        private long _pauseLapseInMillis = 50 ;
+   private Object _synchThreads = new Short((short) -1);
 
-        private ExecutorService _execService;
+   private int _qRunningThreads;
 
-        private Object _synchThreads = new Short((short) -1);
-        
-        private int _qRunningThreads;
+   private Logger _logger = Logger.getLogger(MessageAwareListener.class);
 
-        private Logger _logger = Logger.getLogger(MessageAwareListener.class);
-        
-        private PickUpOnlyCourier _pickUpCourier;
+   private PickUpOnlyCourier _pickUpCourier;
 }




More information about the jboss-svn-commits mailing list