[jboss-svn-commits] JBL Code SVN: r8363 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/internal/soa/esb/listeners listeners/src/org/jboss/soa/esb/listeners listeners/src/org/jboss/soa/esb/listeners/message listeners/src/org/jboss/soa/esb/listeners/newgateway rosetta/src/org/jboss/soa/esb/couriers rosetta/src/org/jboss/soa/esb/helpers

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Dec 16 04:41:10 EST 2006


Author: estebanschifman
Date: 2006-12-16 04:40:54 -0500 (Sat, 16 Dec 2006)
New Revision: 8363

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java
Log:
More progress on new listener classes

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -27,7 +27,6 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Observable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -265,7 +264,7 @@
 			{
 				synchronized (_synchAllReady)
 				{
-					_childrenStarted = new HashMap<Class, Boolean>();
+					_childrenStarted = new HashMap<AbstractManagedListener, Boolean>();
 					for (ConfigTree oCurr : _config.getAllChildren()) 
 					{
 						String sClass 	= oCurr.getAttribute(PARM_LISTENER_CLASS);
@@ -305,19 +304,18 @@
 							+ " - Continuing with cached version", e);
 				}
 		}
-		// m_oState = State.Shutting_down;
+//		 _status = State.Shutting_down;
 
 		_status = State.Done_OK;
 		_status.setCompletionCode(0);
-		_logger.debug("Finishing_____________________________________________________");
 
 		// Close the command queue...
-		try {
+		try
+		{
 			if (null != _commandQueue)
 				_commandQueue.close();
-		} catch (CommandQueueException e) {
-			_logger.error("Error closing Command Queue.", e);
 		}
+		catch (CommandQueueException e) { _logger.debug("Error closing Command Queue.", e); }
 	} // ________________________________
 
 	private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName) 
@@ -329,7 +327,7 @@
 			AbstractManagedListener listener = (AbstractManagedListener)
 					oConst.newInstance(new Object[] { this,p_oP});
 
-			_childrenStarted.put(listener.getClass(), Boolean.FALSE);
+			_childrenStarted.put(listener, Boolean.FALSE);
 			((Observable)listener).addObserver(this);
 			new Thread(listener).start();
 		}
@@ -462,8 +460,12 @@
 	 */
 	public void requestEnd() 
 	{
+		_endTimeStamp = System.currentTimeMillis()+1000;
+		_nextReload = _endTimeStamp+100;
+		_logger.debug("Waiting for child threads to finish___________________________");
+		 waitForCompletion();
+		_logger.debug("ListenerManager ends__________________________________________");
 		_endRequested=true;
-		_endTimeStamp = 0;
 	}
 	
 	/* (non-Javadoc)
@@ -591,16 +593,6 @@
 		}
 	} //____________________________
 
-	public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException 
-	{
-		return null;
-	} //____________________________
-
-	public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException 
-	{
-		return null;
-	} //____________________________
-
 	// this method will typically run in the invoker's thread 
 	// (which btw might be the same as the run() thread if this not launched in a separate thread
 	public void waitUntilReady() 
@@ -627,11 +619,50 @@
 		} while (someChildPending);
 	} //____________________________
 	
+	public void xxx()
+	{
+		boolean someChildPending = true;
+		do
+		{
+			Collection<Boolean> allStarted = null;
+			synchronized (_synchAllReady)
+			{
+				allStarted = (null==_childrenStarted) ? null :  _childrenStarted.values(); 
+			}
+			if (null== allStarted)
+				break;
+			someChildPending = false;
+				for (Boolean curr : allStarted)
+					if (Boolean.TRUE.equals(curr))
+						someChildPending = true;
+			if (someChildPending)
+				try { Thread.sleep(_pauseTimeMillis); }
+				catch (InterruptedException e) {	return; }
+				
+		} while (someChildPending);
+	} //____________________________
+
+	public void waitForCompletion()
+	{
+		try { Thread.sleep(2000); }
+		catch (InterruptedException e) {}
+		boolean bRunning = true;
+		while (bRunning && null!=_childrenStarted && _childrenStarted.size()>0)
+		{
+			bRunning = false;
+			for (AbstractManagedListener curr :_childrenStarted.keySet())
+			{
+				if (curr.hasActiveTreads())
+					bRunning = true; 
+			}
+		}
+	} //____________________________
+	
 	// Child processes must let us know when they're ready
 	public void update(Observable o, Object arg) 
 	{
 		if (null!=_childrenStarted && (arg instanceof Boolean))
-			_childrenStarted.put(o.getClass(), (Boolean)arg);
+			_childrenStarted.put((AbstractManagedListener)o, (Boolean)arg);
 	} //____________________________
 
 
@@ -643,7 +674,7 @@
 	private State 			_status = State.Uninitialised;
 	public State getState() { return _status; }
 	
-	private Map<Class,Boolean> _childrenStarted;
+	private Map<AbstractManagedListener,Boolean> _childrenStarted;
 	
 	
 	private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -83,14 +83,14 @@
     	_eprCategoryName= _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
         _eprName		= _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);        
 
-        String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
+        String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
         if (!Util.isNullString(attr))
         	try { _maxThreads = Integer.parseInt(attr); }
         	catch (Exception e) { attr = null; }
         if (Util.isNullString(attr))
         {
         	_maxThreads = _defaultMaxThreads;
-        	_logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
+        	_logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS_TAG+" attribute"
         			+" - Using default value of <"+_maxThreads+">");
         }
     } //________________________________
@@ -108,16 +108,6 @@
 	    	_controller.unRegister(_eprCategoryName, _eprName, _epr);    	    	
     } //________________________________
     
-    public boolean ensureThreadAvailable()
-    {
-    	while (  (_qRunningThreads >= _maxThreads)
-    			&&_controller.continueLooping())
-    		try { Thread.sleep(_pauseLapseInMillis); }
-    		catch (InterruptedException e) { return false; }
-    	
-		return _controller.continueLooping();
-    } //________________________________
-
     /**
      * Wait until the registration process finished, a PickupCourier was obtained
      * , and the pool thread is instantiated
@@ -138,6 +128,11 @@
 			updateThreadCount((Integer)arg);
 	} //________________________________
 	
+	public boolean hasActiveTreads()
+	{
+		return _qRunningThreads > 0;
+	} //________________________________
+	
 	protected void resetThreadCount()
 	{
 		synchronized (_synchThreads) 
@@ -148,7 +143,7 @@
 	{
 		synchronized (_synchThreads) 
 			{ _qRunningThreads += i.intValue();}
-        _logger.debug("Thread pool ("+getClass().getSimpleName()+") used ="+_qRunningThreads+"/"+_maxThreads);
+//        _logger.debug("Thread pool ("+getClass().getSimpleName()+") used ="+_qRunningThreads+"/"+_maxThreads);
 	} //________________________________
 	
     protected String obtainAttribute(String p_sAtt, String p_sDefault)
@@ -161,7 +156,7 @@
 	
 		return (null != sVal) ? sVal : p_sDefault;
 	} // ________________________________
-	
+	    
     /**
      * Loops until controlling process determines
      * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
@@ -184,8 +179,13 @@
 	    	while (_controller.continueLooping())
 	        {
 	    		// Only pickup a message when a thread is available
-	    		if (! ensureThreadAvailable())
-	    			break;
+	    		if (_qRunningThreads >= _maxThreads)
+	    		{
+//	    			_logger.debug("+++++++++++"+Thread.currentThread()+_qRunningThreads);
+	    			try { Thread.sleep(_pauseLapseInMillis); }
+	    			catch (InterruptedException e) { break;}
+	    			continue;
+	    		}
 	    		
 	    		long lWait = _controller.millisToWait();
 

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -22,7 +22,6 @@
 package org.jboss.soa.esb.listeners;
 
 import java.text.SimpleDateFormat;
-import java.util.List;
 import java.util.Observer;
 
 import org.jboss.soa.esb.addressing.EPR;
@@ -75,22 +74,4 @@
 	 */
 	public abstract void unRegister(String serviceCategoryName, String serviceName, EPR epr) throws RegistryException;
 
-	/**
-	 * Obtain an EPR under requested category and name
-	 * 
-	 * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
-	 * @param serviceName         - name of the service ("
-	 * @throws RegistryException
-	 */
-	public abstract EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException;
-
-	/**
-	 * Obtain a list of EPRs under requested category and name
-	 * 
-	 * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
-	 * @param serviceName         - name of the service ("
-	 * @throws RegistryException
-	 */
-	public abstract List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException;
-
 }
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -27,8 +27,12 @@
 
 public class ListenerTagNames 
 {
+	/** EPRs */
+	public static final String URL_TAG						= "URL";
+	public static final String PROTOCOL_TAG					= "protocol";
+
 	/** Threading */
-	public static final String MAX_THREADS					= "maxThreads";
+	public static final String MAX_THREADS_TAG				= "maxThreads";
 
 	/** Registry */
     public static final String SERVICE_CATEGORY_NAME_TAG    = "service-category";

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -1,6 +1,24 @@
 package org.jboss.soa.esb.listeners;
 
+import java.io.File;
+import java.net.URL;
+import java.util.Collection;
+
 import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.FTPEpr;
+import org.jboss.soa.esb.addressing.eprs.FileEpr;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.NamingContext;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.util.Util;
 
 public class ListenerUtil 
 {
@@ -23,9 +41,7 @@
 		ListenerManager manager = ListenerManagerFactory.getInstance().getListenerManager(parametersName);
 		if (inNewThread)
 		{
-			_logger.debug(Thread.currentThread()+ " Waiting for all child listeners to start");
-			manager.waitUntilReady();
-			_logger.debug(Thread.currentThread()+ " All child listeners ready");
+			new Thread(manager).start();
 		}
 		else
 		{
@@ -38,5 +54,194 @@
 		return manager;
 	} //________________________________
 	
+	public static EPR tryToDeliver(Message message, String category, String name) 
+		throws Exception
+	{
+		Courier courier = null;
+		Registry reg = RegistryFactory.getRegistry();
+		Collection<EPR> all = reg.findEPRs(category, name);
+		if (null!=all)
+			for (EPR epr : all)
+			{
+				try
+				{
+					courier = CourierFactory.getCourier(epr);
+					if (!courier.deliver(message))
+						continue;
+					return epr;
+				}
+				finally { CourierUtil.cleanCourier(courier); }
+			}
+		String service = "["+category+","+name+"]";
+		String txt = (null==all || all.size()<1)
+			? "No EPRs registered for "+service 
+			: "Unable to deliver message to registered EPRs for "+service
+			;
+		throw new Exception(txt);
+	} //________________________________
+	
+	public static EPR assembleEpr(ConfigTree tree)
+    	throws ConfigurationException
+    {
+    	String protocol = tree.getAttribute(ListenerTagNames.PROTOCOL_TAG);
+    	ConfigTree	eprElement = tree.getFirstChild(protocol+"EPR");
+
+    	try
+    	{
+    		if ("jms"	.equals(protocol))  return jmsEprFromElement(eprElement);
+    		if ("file"	.equals(protocol))  return fileEprFromElement(eprElement);
+    	}
+    	catch (Exception e) 
+    	{ 
+    		_logger.error("Problem",e);
+    		throw new ConfigurationException("xx",e); 
+    	}
+    	
+    	throw new ConfigurationException("Unknown protocol <"+protocol+">");
+    	
+    } //________________________________
+	
+	public static JMSEpr jmsEprFromElement(ConfigTree tree)
+		throws ConfigurationException
+	{
+		try
+		{
+			
+	        String name		= tree.getAttribute(JMSEpr.DESTINATION_NAME_TAG,null);
+
+	        String type		= getAttrAndWarn(tree, JMSEpr.DESTINATION_TYPE_TAG,"queue");			
+			String jndiURL 	= getAttrAndWarn(tree, JMSEpr.JNDI_URL_TAG	,"localhost");
+	        String jndiContextFactory 
+	        	= getAttrAndWarn(tree,JMSEpr.JNDI_CONTEXT_FACTORY,NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY);
+	        String jndiPkgPrefix 
+	        	=getAttrAndWarn(tree,JMSEpr.JNDI_PKG_PREFIX_TAG,NamingContext.JBOSS_URL_PKG_PREFIX);
+	        String jmsFactoryClass 
+	        	= getAttrAndWarn(tree,JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+	        
+	        String selector = tree.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+	        if (Util.isNullString(selector))
+	        	_logger.debug("No value specified for "+JMSEpr.MESSAGE_SELECTOR_TAG+" attribute"
+	        			+" -  All messages in queue <"+name+"> will be picked up by listener");
+	
+	        JMSEpr epr = new JMSEpr(type,name,jmsFactoryClass
+	        				,jndiURL,jndiContextFactory,jndiPkgPrefix,selector);
+	
+			return epr;
+		}
+		catch (Exception e) { throw new ConfigurationException(e); }
+	} //________________________________
+	
+
+	public static FileEpr fileEprFromElement(ConfigTree tree)
+		throws Exception
+	{
+			URL url = new URL(tree.getAttribute(ListenerTagNames.URL_TAG,null));
+			String protocol = url.getProtocol();
+			
+			if ("file".equals(protocol))
+			{
+				if (!new File(url.getFile()).isDirectory())
+					throw new ConfigurationException("Attribute "
+						+ListenerTagNames.URL_TAG+" must reference a directory");
+			}
+
+			FileEpr epr = 
+					 ("file".equals(protocol)) 	? new FileEpr(url)
+					:("ftp".equals(protocol)) 	? new FTPEpr (url)
+					: null
+			;
+			if (null==epr)
+				throw new ConfigurationException("Unsupported file protocol : "+protocol);
+			
+			epr.setInputSuffix(tree.getAttribute(FileEpr.INPUT_SUFFIX_TAG,null));
+			
+			boolean bErrorDel = Boolean.parseBoolean
+						(getAttrAndWarn(tree,FileEpr.ERROR_DEL_TAG,"true"));
+			epr.setErrorDelete(bErrorDel);
+
+			String errorDir 	= tree.getAttribute(FileEpr.ERROR_DIR_TAG);
+			String errorSuffix	= tree.getAttribute(FileEpr.ERROR_SUFFIX_TAG);
+			if (bErrorDel)
+			{
+				if (null!= errorDir || null!=errorSuffix)
+					_logger.warn("If you don't specify "+FileEpr.ERROR_DEL_TAG+"'false' ,"
+							+FileEpr.ERROR_DIR_TAG+" and "+FileEpr.ERROR_SUFFIX_TAG
+							+" will have no effect because files in error will be deleted");
+			}
+			if (null==errorDir)
+			{
+				errorDir = url.getFile();
+				warnDefault(FileEpr.ERROR_DIR_TAG, errorDir);
+			}
+			if (null==errorSuffix)
+			{
+				errorSuffix = ".esbERROR";
+				warnDefault(FileEpr.ERROR_SUFFIX_TAG, errorSuffix);
+			}
+			
+			
+			boolean bPostDel = Boolean.parseBoolean
+					(getAttrAndWarn(tree,FileEpr.POST_DEL_TAG,"false"));
+			epr.setPostDelete(bPostDel);
+
+			String postDir 	= tree.getAttribute(FileEpr.POST_DIR_TAG);
+			String postSuffix	= tree.getAttribute(FileEpr.POST_SUFFIX_TAG);
+			if (bPostDel)
+			{
+				if (null!= postDir || null!=postSuffix)
+					_logger.warn("If you specify "+FileEpr.POST_DEL_TAG+"'true' ,"
+							+FileEpr.POST_DIR_TAG+" and "+FileEpr.POST_SUFFIX_TAG
+							+" will have no effect because processed input messages will be deleted");
+			}
+			if (null==postDir)
+			{
+				postDir = url.getFile();
+				warnDefault(FileEpr.POST_DIR_TAG, postDir);
+			}
+			if (null==postSuffix)
+			{
+				postSuffix = ".esbDONE";
+				warnDefault(FileEpr.POST_SUFFIX_TAG, postSuffix);
+			}
+			
+			if (epr instanceof FTPEpr)
+			{
+				FTPEpr ftp = (FTPEpr)epr;
+				ftp.setUserName(getAttrAndWarn(tree, FTPEpr.USERNAME_TAG, null));
+				ftp.setPassword(getAttrAndWarn(tree, FTPEpr.PASSWORD_TAG, ""));
+				ftp.setPassive(Boolean.valueOf(getAttrAndWarn(tree,FTPEpr.PASSIVE_TAG,"false")));
+			}
+			return epr;
+	} //________________________________
+	
+	
+	public static String getAttrAndWarn(ConfigTree tree, String tag,String defaultValue)
+		throws ConfigurationException
+	{
+		String value = null;
+		try
+		{
+			value = tree.getAttribute(tag);
+			if (null==value)
+				if (null==defaultValue)
+					throw new ConfigurationException("Missing or invalid "+tag+" attribute");
+				else
+				{
+					warnDefault(tag,defaultValue);
+					value = defaultValue;
+				}
+			return value;
+		}
+		catch (Exception e) { throw new ConfigurationException(e); }
+	} //________________________________
+	
+	private static final boolean LOGWARN=false;
+	private static void warnDefault(String tag, String defaultValue)
+	{
+		if (LOGWARN)
+		_logger.debug("No value specified for "+tag+" attribute"
+				+" -  Using default value: '"+defaultValue+"'");
+	} //________________________________
+	
 	private static final Logger _logger = Logger.getLogger(ListenerUtil.class);
 }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -81,12 +81,12 @@
 
             	for (String currMethod : saMethodList)
             	{
+//                	_logger.debug("Attempting to invoke "+_currentClass.getName()+" method "+currMethod);
 	            	Method method = _currentClass.getMethod(currMethod,new Class[] {Message.class});
 	            	
 	            	// The processing result of each action feeds into the processing of the next action...
 	                try 
 	                {
-	                	_logger.debug("Invoking "+_currentClass.getName()+" method "+method.toString());
 	                	Message next = (Message)method.invoke(_currentProcessor,new Object[] {_message} );
 	                    if(next==null)
 	                    {
@@ -130,8 +130,8 @@
         finally 
         {
         	processingComplete = true;
+        	setChanged();
         	notifyObservers(new Integer(-1));
-        	setChanged();
         }
     }
     

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -22,17 +22,17 @@
 
 package org.jboss.soa.esb.listeners.message;
 
-import java.lang.reflect.Method;
-
 import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.couriers.CourierUtil;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.AbstractManagedListener;
 import org.jboss.soa.esb.listeners.ListenerManager;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
@@ -72,10 +72,16 @@
     {
     	super.checkMyParms();
 
+    	// make sure a protocol was specified
+    	obtainAttribute(ListenerTagNames.PROTOCOL_TAG, null);
+    	
     	if (Util.isNullString(_eprCategoryName))
     		throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
     	if (Util.isNullString(_eprName))
     		throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_NAME_TAG);
+    	
+    	_epr = ListenerUtil.assembleEpr(_config);
+//    	_logger.debug(EPRHelper.toXMLString(_epr));
     } // ________________________________
     
 	@Override
@@ -83,34 +89,6 @@
 	
 	
 	@Override
-	public boolean finalizeRun() 
-	{
-    	try 
-    	{ 
-        	if (null!=_pickUpCourier)
-    	    	try
-    	    	{
-    	    		Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
-    	    		cleanMethod.invoke(_pickUpCourier, new Object[] {});
-    	    	}
-    	    	catch (NoSuchMethodException e) {/*  OK  Just don't invoke it  */  }
-    	    	catch (Exception e)
-    	    	{
-    	    		_logger.error("Problems invoking courier.clean() Method",e);
-    	    	}
-
-    	    unregisterProcess();
-    		return true; 
-    	}
-	   	catch (RegistryException re) 
-	   	{
-	   		_logger.warn("Could not un register service " + re.getLocalizedMessage());
-	   		return false;
-	   	}
-
-	} //________________________________
-
-	@Override
 	public boolean initializeRun() 
 	{
         try   
@@ -128,6 +106,23 @@
 	} //________________________________
 
 	@Override
+	public boolean finalizeRun() 
+	{
+    	try 
+    	{
+    		CourierUtil.cleanCourier(_pickUpCourier);
+    	    unregisterProcess();
+    		return true; 
+    	}
+	   	catch (RegistryException re) 
+	   	{
+	   		_logger.warn("Could not un register service " + re.getLocalizedMessage());
+	   		return false;
+	   	}
+
+	} //________________________________
+
+	@Override
 	public void waitForEventAndProcess(long maxWaitMillis) 
 	{
 		Message message = null;

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.newgateway;
+
+import org.jboss.internal.soa.esb.couriers.DeliverOnlyCourier;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.AbstractManagedListener;
+import org.jboss.soa.esb.listeners.ListenerManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * Esb abstract Gateway listener.
+ * <p/> Transport specific gateways must extend this class
+ * <br/>Keeps a thread pool to instantiate composer classes when an incoming event is received 
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+
+public abstract class GatewayListener extends AbstractManagedListener
+{
+	/**
+	 * public constructor
+	 * @param controller ListenerManager - the controlling process
+	 * @param config ConfigTree - Containing 'static' configuration for this instance
+	 * @throws Exception
+	 */
+    public GatewayListener(ListenerManager controller, ConfigTree config) 
+    	throws ConfigurationException 
+    {
+    	super (controller, config);
+    } // _______________________________
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws ConfigurationException -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
+     */
+    protected void checkMyParms() throws ConfigurationException 
+    {
+    	super.checkMyParms();
+    	_targetServiceCategory	= obtainAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG,null);
+        _targetServiceName			= obtainAttribute(ListenerTagNames.SERVICE_NAME_TAG,null);        
+    } // ________________________________
+    
+	@Override
+	public boolean isMessageAware() { return false; }
+	
+	
+	@Override
+	public boolean finalizeRun() 
+	{
+    	try 
+    	{
+    		CourierUtil.cleanCourier(_deliverCourier);
+    	    unregisterProcess();
+    		return true; 
+    	}
+	   	catch (RegistryException re) 
+	   	{
+	   		_logger.warn("Could not un register service " + re.getLocalizedMessage());
+	   		return false;
+	   	}
+
+	} //________________________________
+
+	@Override
+	public boolean initializeRun() 
+	{
+        try   
+        {
+        	if (! Util.isNullString(_eprCategoryName))
+        		registerProcess();
+        	return true;
+        }
+		catch (Exception re) 
+		{ 
+			_logger.fatal("Could not register service " + re.getLocalizedMessage(),re);
+			return false;
+		}
+		
+	} //________________________________
+	
+	protected String            _targetServiceCategory;
+    protected String			_targetServiceName;
+    protected EPR				_targetEpr;
+
+	protected DeliverOnlyCourier _deliverCourier;
+} 

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -31,6 +31,7 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.lang.reflect.Method;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +40,8 @@
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.DeliverOnlyCourier;
+import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -50,9 +53,7 @@
 {
 	public static final String CORRELATION_ID_TAG = "messageCorrelationId";
 
-	private CourierUtil()
-	{
-	}
+	private CourierUtil() {}
 
 	public static List<KeyValuePair> propertiesFromSelector(String selector)
 			throws Exception
@@ -233,5 +234,26 @@
 		courier.deliver(message);
 	}
 
+    public static void cleanCourier (PickUpOnlyCourier courier) { cleanableCleanup(courier);}
+    public static void cleanCourier (DeliverOnlyCourier courier){ cleanableCleanup(courier);}
+    public static void cleanCourier (TwoWayCourier courier)		{ cleanableCleanup(courier);}
+	    
+    private static void cleanableCleanup (Object courier)
+    {
+    	if (null!=courier)
+	    	try
+	    	{
+	    		Method cleanMethod = courier.getClass().getMethod("cleanup", new Class[] {});
+	    		cleanMethod.invoke(courier, new Object[] {});
+	    	}
+	    	catch (NoSuchMethodException e) {/*  OK  Just don't invoke it  */  }
+	    	catch (Exception e)
+	    	{
+	    		_logger.error("Problems invoking clean() Method for class "
+	    				+courier.getClass().getSimpleName(),e);
+	    	}
+    } //________________________________
+	
+
 	protected static Logger _logger = Logger.getLogger(CourierUtil.class);
 }

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java	2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java	2006-12-16 09:40:54 UTC (rev 8363)
@@ -134,6 +134,22 @@
 		return (null==_attributes) ? null : _attributes.get(name);
 	} // _______________________________
 	/**
+	 * retrieve the value assigned to an attribute key
+	 * @param name String - the search key
+	 * @param defaultValue String - the default value to return if attribute is not set
+	 * @return String - the value assigned to the specified key
+	 */
+	public String getAttribute(String name, String defaultValue)
+		throws Exception
+	{
+		String ret = (null==_attributes) ? null : _attributes.get(name);
+		if (null!=ret)
+			return ret;
+		if (null!=defaultValue)
+			return defaultValue;
+		throw new Exception("Invalid or missing <"+name+"> attribute ");
+	} // _______________________________
+	/**
 	 * obtain the list of all attribute names
 	 * @return Set<String>  - the set of keys that have been assigned a non null value
 	 */




More information about the jboss-svn-commits mailing list