[jboss-svn-commits] JBL Code SVN: r8278 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Dec 13 05:13:53 EST 2006


Author: estebanschifman
Date: 2006-12-13 05:13:49 -0500 (Wed, 13 Dec 2006)
New Revision: 8278

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
Log:
ActionProcessingPipeline now extends Observable - new transport agnostic MessageAwareListener

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2006-12-13 08:46:29 UTC (rev 8277)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2006-12-13 10:13:49 UTC (rev 8278)
@@ -3,6 +3,7 @@
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Observable;
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
@@ -19,7 +20,7 @@
  * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
  * @since Version 4.0
  */
-public class ActionProcessingPipeline implements Runnable 
+public class ActionProcessingPipeline extends Observable implements Runnable 
 {    
 	private boolean processingComplete = false;
          
@@ -125,8 +126,12 @@
         catch(IllegalAccessException e)
     	{	_logger.error(prematureTermination("unable to access method"),e); } 
         catch (IllegalArgumentException e) {
-        } finally {
+        } 
+        finally 
+        {
         	processingComplete = true;
+        	notifyObservers(new Integer(-1));
+        	setChanged();
         }
     }
     

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-13 08:46:29 UTC (rev 8277)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-13 10:13:49 UTC (rev 8278)
@@ -0,0 +1,260 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.message;
+
+import java.lang.reflect.Method;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * Esb Message aware transport independent listener.
+ * <p/> Relies on the CourierFactory to obtain an appropriate Courier 
+ * for the EPR this listener will be listening on
+ * <br/>Keeps a thread pool to instantiate ActionProcessingPipelines whenever a Message is received 
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+
+public class MessageAwareListener implements Runnable, Observer
+{
+	
+	/**
+	 * public constructor
+	 * @param controller EsbListenerController - the controlling process
+	 * @param config ConfigTree - Containing 'static' configuration for this instance
+	 * @throws Exception
+	 */
+    public MessageAwareListener(EsbListenerController controller, ConfigTree config) 
+    	throws ConfigurationException 
+    {
+        _millisToWaitForThread = 50;
+        _defaultMaxThreads	= 1;
+		_controller 	= controller;
+		_config 		= config;
+    	
+    	checkMyParms();
+    } // _______________________________
+    
+    // Child threads will send a -1  when their run() method ends
+    // we need to prevent picking up Messages when there are no available threads in pool
+	public void update(Observable o, Object arg) 
+	{
+		if (arg instanceof Integer)
+		{
+			_qRunningThreads += ((Integer)arg).intValue();
+	        _logger.debug("Thread returned to pool("+_qRunningThreads+"/"+_maxThreads);
+		}
+	} //________________________________
+	
+    /**
+     * Loops until controlling process determines
+     * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
+     * <br/>When one is received, instantiates an action processing pipeline to process it
+     */
+    public void run()
+    {
+        _logger.debug("run() method of "+this.getClass().getSimpleName()
+        		+" started on thread "+Thread.currentThread().getName());
+
+        boolean bRegisterOK = false;
+        try   
+        {
+        	registerProcess();
+        	bRegisterOK = true;
+        }
+		catch (Exception re) 
+			{ _logger.fatal("Could not register service " + re.getLocalizedMessage(),re); }
+		
+		if (bRegisterOK)
+		{
+			// Instantiate the pool thread and set active count to zero
+			_execService = Executors.newFixedThreadPool(_maxThreads);
+			_qRunningThreads = 0;
+			
+	    	while (_controller.continueLooping())
+	        {
+	    		// Check to see if there are available threads in pool
+	    		if (_qRunningThreads >= _maxThreads)
+	    			try 
+	    			{
+	    				Thread.sleep(_millisToWaitForThread);
+	    				continue;
+	    			}
+	    			catch (InterruptedException e) 
+	    			{
+	    				break; 
+	    			}
+
+	    		// Only pickup a message when a thread is available
+	    		long lWait = _controller.millisToWait();
+
+	    		// Just in case (this should never happen - it's a safety net)
+	    		if (lWait< 1)
+	    			continue;
+	    		
+	    		Message message = null;
+	    		try { message = (lWait > 0 ) ? _pickUpCourier.pickup(lWait) : null; }
+	    		catch (CourierTimeoutException e) { continue; }
+	    		catch (CourierException e) 
+	    		{
+	    			_logger.error(e);
+	    			continue;
+	    		}
+
+	        	if (null!=message)
+	        	{	
+	        		ActionProcessingPipeline chain = null;
+	
+	        		try	
+	        		{ 
+	        			chain = new ActionProcessingPipeline(message,_config);
+	        			chain.addObserver(this);
+	        			_qRunningThreads++;
+	        	        _logger.debug("Thread claimed from pool("+_qRunningThreads+"/"+_maxThreads);
+	        			_execService.execute(chain);
+	        		}
+	        		catch (Exception e)	{	_logger.error(e); 	continue; }
+	
+	        	}
+	        }
+	    	
+		}
+    	try { unregisterProcess(); }
+	   	catch (Exception re) 
+	   	{
+	   		_logger.warn("Could not un register service " + re.getLocalizedMessage());
+	   	}
+		// shutdown should wait for all child threads to finish
+		if (null!=_execService)
+			_execService.shutdown();
+		
+        _logger.debug("run() method of "+this.getClass().getSimpleName()
+        		+" finished on thread "+Thread.currentThread().getName());
+    } // _______________________________
+    
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
+     */
+    protected void checkMyParms() throws ConfigurationException 
+    {
+        _eprCategoryName= obtainAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG,null);
+        _eprName		= obtainAttribute(ListenerTagNames.SERVICE_NAME_TAG,null);
+        
+        String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
+        if (!Util.isNullString(attr))
+        	try { _maxThreads = Integer.parseInt(attr); }
+        	catch (Exception e) { attr = null; }
+        if (Util.isNullString(attr))
+        {
+        	_maxThreads = _defaultMaxThreads;
+        	_logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
+        			+" - Using default value of <"+_maxThreads+">");
+        }
+
+    } // ________________________________
+    
+    private void registerProcess()  throws Exception
+    {
+    	_bRegistered = false;
+    	_controller.register(_config,_epr);
+    	_bRegistered = true;
+    	_pickUpCourier = CourierFactory.getPickupCourier(_epr);
+    } // ________________________________
+
+    private void unregisterProcess()  throws Exception
+    {
+    	if (null!=_pickUpCourier)
+	    	try
+	    	{
+	    		Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
+	    		cleanMethod.invoke(_pickUpCourier, new Object[] {});
+	    	}
+	    	catch (NoSuchMethodException e) {/*  OK  Just don't invoke it  */  }
+	    	catch (Exception e)
+	    	{
+	    		_logger.error("Problems invoking courier.clean() Method",e);
+	    	}
+
+	    if (_bRegistered)
+	    	_controller.unRegister(_eprCategoryName, _eprName, _epr);    	
+
+    } // ________________________________
+
+    protected String obtainAttribute(String p_sAtt, String p_sDefault)
+		throws ConfigurationException 
+	{
+    	_logger.info("Reading value for " + p_sAtt);
+    	String sVal = _config.getAttribute(p_sAtt);
+    	if ((null == sVal) && (null == p_sDefault))
+    		throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+	
+    	return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+    
+	public void waitUntilReadyToPickup() 
+	{
+		while(null==_execService) 
+			try { Thread.sleep(50); }
+			catch (InterruptedException e) 
+				{ _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
+	} //________________________________
+
+
+	protected ConfigTree 			_config = null;
+	protected EsbListenerController _controller = null;
+
+	protected String            _eprCategoryName;
+    protected String			_eprName;
+    protected EPR				_epr;
+    
+    protected int				_maxThreads;
+    protected int				_qRunningThreads;
+    protected int				_defaultMaxThreads;
+    protected long				_millisToWaitForThread;
+    protected ExecutorService	_execService;
+    protected boolean			_bRegistered;
+    
+    protected PickUpOnlyCourier _pickUpCourier;
+
+    protected static transient Logger _logger = Logger.getLogger(MessageAwareListener.class);
+
+} 




More information about the jboss-svn-commits mailing list