[jboss-svn-commits] JBL Code SVN: r7457 - in labs/jbossesb/trunk/product/core/listeners: src/org/jboss/soa/esb/listeners src/org/jboss/soa/esb/listeners/gateway tests/src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb/listeners/gateway

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Nov 7 19:12:37 EST 2006


Author: estebanschifman
Date: 2006-11-07 19:12:30 -0500 (Tue, 07 Nov 2006)
New Revision: 7457

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
New FileGatewayListener class and unit test

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-11-08 00:12:30 UTC (rev 7457)
@@ -9,6 +9,7 @@
 	public static final String EPR_DESCRIPTION_TAG          = "epr-description";
 
 	/**  Gateways  */
+    public static final String TARGET_SERVICE_CATEGORY_TAG	= "target-service-category";
 	public static final String TARGET_SERVICE_NAME_TAG	    = "target-service-name";
     
 	/** ActionProcessingPipeline */
@@ -26,6 +27,9 @@
 
     /** RuleSet reference (Filename) */
     public static final String RULE_SET_TAG                 = "ruleSet";
+
+    /** Poller tagnames   */
+    public static final String POLL_LATENCY_SECS_TAG		= "pollLatencySeconds";
     
     /** File Actions  */
     public static final String FILE_INPUT_DIR_TAG			= "inputDir";

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2006-11-08 00:12:30 UTC (rev 7457)
@@ -0,0 +1,346 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+import org.jboss.soa.esb.ConfigurationException;
+
+/**
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+public class FileGatewayListener implements Runnable
+{
+
+    public FileGatewayListener(GatewayListenerController commandListener, ConfigTree config) 
+    	throws Exception 
+    {
+    	_config		= config;
+    	_controller	= commandListener;
+    	_sleepBetweenPolls = 10000;			//  milliseconds
+        checkMyParms();
+    } // __________________________________
+
+	public void run() 
+	{
+		if (null!=_serviceName)
+			try {	_controller.register(_config,_myEpr); }
+			catch (RegistryException e1) 
+				{	_logger.warn("unable to register service",e1); }
+
+		boolean bSleep = false;
+		while (_controller.continueLooping()) 
+        {
+			//  only sleep in between
+			if (bSleep)
+				try { Thread.sleep(_sleepBetweenPolls); }
+				catch (InterruptedException e) { return; }
+			else
+				bSleep = true;
+
+			for (File fileIn : _inputDirectory.listFiles(_fileFilter))
+            {
+        		// Try to rename - if unsuccessful, somebody else got it first
+        		File fileWork = new File(fileIn.getParent(),fileIn.getName()+_workingSuffix);
+        		if (! fileIn.renameTo(fileWork))
+        			continue;
+
+        		Throwable thrown = null;
+        		String text = null;
+	            try
+	            {
+	            	Object obj = _processMethod.invoke(_composer,new Object[] {fileWork} );
+	        		if (null==obj)
+	        		{
+	        			_logger.warn("Action class method <"+_processMethod.getName()+"> returned a null object");
+	        			continue;
+	        		}
+	        		_courier.deliver((org.jboss.soa.esb.message.Message)obj);
+	            }
+
+		            catch (InvocationTargetException e)	
+		            {
+		            	thrown = e;
+		            	text = "Problems invoking method <"+_processMethod.getName()+">";
+		            	
+		            }
+		            catch (IllegalAccessException e)	
+		            {	
+		            	thrown = e;
+		            	text = "Problems invoking method <"+_processMethod.getName()+">";
+		            }
+	        		catch (ClassCastException e)
+	        		{
+	        			thrown = e;
+	        			text = "Action class method <"+_processMethod.getName()+"> returned a non Message object";
+	        		}
+	        		catch (CourierException e)
+	        		{
+	        			thrown = e;
+	        			text = "Courier <"+_courier.getClass().getName()+".deliver(Message) FAILED";
+	        		}
+        		
+        		if (null==thrown)
+        		{
+            		File fileOK		=  new File(_postProcessDirectory,fileIn.getName()+_postProcessSuffix);
+        			if (_deleteAfterOK)
+        				fileWork.delete();
+        			else
+        			{
+        				fileOK.delete();
+        				fileWork.renameTo(fileOK);
+        			}
+        		}
+        		else
+        		{
+        			thrown.printStackTrace();
+        			_logger.error(text,thrown);
+            		File fileError	=  new File(_errorDirectory,fileIn.getName()+_errorSuffix);
+        			fileWork.renameTo(fileError);
+        		}
+            }
+        }
+        
+		if (null!=_serviceName)
+			try { _controller.unRegister(_serviceCategory, _serviceName,_myEpr); }
+			catch (RegistryException e1){	_logger.warn("unable to unRegister service",e1); }
+
+    } // ________________________________
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
+     */
+    protected void checkMyParms() throws Exception 
+    {
+        // Third arg is null - Exception will be thrown if attribute is not found
+    	_targetServiceCategory	= _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+    	_targetServiceName	= _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+    	_targetEpr		= _controller.getEprByName(_targetServiceName);
+    	if (null==_targetEpr)
+        	throw new ConfigurationException("EPR <"+_targetServiceName+"> not found in registry");
+        _courier		= CourierFactory.getCourier(_targetEpr);
+
+        // Polling interval
+        String sAux = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+        if (! Util.isNullString(sAux))
+	        try { _sleepBetweenPolls = 1000 * Long.parseLong(sAux); }
+	        catch (NumberFormatException e)
+	        	{ _logger.warn("Invalid poll latency - keeping default of "+(_sleepBetweenPolls/1000)); }
+        
+        resolveComposerClass();
+
+	//  INPUT directory and suffix  (used for FileFilter)
+	  String sInpDir = _controller.obtainAtt(_config,ListenerTagNames.FILE_INPUT_DIR_TAG,null);
+      _inputDirectory = fileFromString(sInpDir);
+      seeIfOkToWorkOnDir(_inputDirectory);
+
+      _inputSuffix  = _controller.obtainAtt(_config,ListenerTagNames.FILE_INPUT_SFX_TAG,null);
+      _inputSuffix  = _inputSuffix.trim();
+      if (_inputSuffix.length()<1)
+    	  throw new Exception ("Invalid "+ListenerTagNames.FILE_INPUT_SFX_TAG+" attribute");
+	  _fileFilter = new FileEndsWith(_inputSuffix);
+
+	//  WORK suffix (will rename in input directory)
+      _workingSuffix	= _controller.obtainAtt(_config,ListenerTagNames.FILE_WORK_SFX_TAG,".esbWork").trim();
+      if (_workingSuffix.length()<1)
+    	  throw new Exception ("Invalid "+ListenerTagNames.FILE_WORK_SFX_TAG+" attribute");
+      if (_inputSuffix.equals(_workingSuffix))
+    	  throw new Exception("Work suffix must differ from input suffix <"+_workingSuffix+">");
+
+    //    ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
+      String sErrDir = _controller.obtainAtt(_config,ListenerTagNames.FILE_ERROR_DIR_TAG,sInpDir);
+      _errorDirectory = fileFromString(sErrDir);
+      seeIfOkToWorkOnDir(_errorDirectory);
+
+      _errorSuffix  = _controller.obtainAtt(_config,ListenerTagNames.FILE_ERROR_SFX_TAG,".esbError").trim();
+      if (_errorSuffix.length()<1)
+    	  throw new Exception ("Invalid "+ListenerTagNames.FILE_ERROR_SFX_TAG+" attribute");
+      if (_errorDirectory.equals(_inputDirectory) && _inputSuffix.equals(_errorSuffix))
+    	  throw new Exception("Error suffix must differ from input suffix <"+_errorSuffix+">");
+
+
+   //    Do users wish to delete files that were processed OK ?
+      String sPostDel = _controller.obtainAtt(_config,ListenerTagNames.FILE_POST_DEL_TAG,"false").trim();
+      _deleteAfterOK = Boolean.parseBoolean(sPostDel);
+      if (_deleteAfterOK)
+    	  return;
+
+    //    POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
+      String sPostDir = _controller.obtainAtt(_config,ListenerTagNames.FILE_POST_DIR_TAG,sInpDir);
+      _postProcessDirectory = fileFromString(sPostDir);
+      seeIfOkToWorkOnDir(_postProcessDirectory);
+      _postProcessSuffix  = _controller.obtainAtt(_config,ListenerTagNames.FILE_POST_SFX_TAG,".esbDone").trim();
+      if (_postProcessDirectory.equals(_inputDirectory))
+      {	if (_postProcessSuffix.length()<1)
+    	  throw new Exception ("Invalid "+ListenerTagNames.FILE_POST_SFX_TAG+" attribute");
+      	if (_postProcessSuffix.equals(_inputSuffix))
+    	  throw new Exception("Post process suffix must differ from input suffix <"+_postProcessSuffix+">");
+      }
+    } //________________________________
+    
+    protected void resolveComposerClass() throws Exception
+    {
+        // Look for first "action" element - only first one will be used
+        String tagName = ListenerTagNames.ACTION_ELEMENT_TAG;
+        ConfigTree actionElement = _config.getFirstChild(tagName);
+        String sProcessMethod =  null;
+        if (null!=actionElement)
+        {	// class attribute
+            _composerName	= _controller.obtainAtt(actionElement,ListenerTagNames.ACTION_CLASS_TAG,null);
+            _composerClass = Class.forName(_composerName);
+        	Constructor oConst = _composerClass.getConstructor(new Class[] {ConfigTree.class});
+        	_composer= oConst.newInstance(_config);
+        	tagName	= ListenerTagNames.PROCESS_METHOD_TAG;
+        	sProcessMethod = _controller.obtainAtt(_config,tagName,tagName);
+        }
+        else
+        {
+        	_composerName = PackageFileContents.class.getName();
+        	_composerClass= PackageFileContents.class;
+        	_composer	= new PackageFileContents();
+        	sProcessMethod = "process";
+        }
+
+    	_processMethod = _composerClass.getMethod(sProcessMethod,new Class[] {Object.class});
+    } //________________________________
+
+    protected void seeIfOkToWorkOnDir (File p_oDir) throws Exception
+	{
+      if (! p_oDir.exists())   
+    	  throw new Exception ("Directory "+p_oDir.toString()+" not found");
+      if (!p_oDir.isDirectory())
+    	  throw new Exception(p_oDir.toString()+" is not a directory");
+      if (!p_oDir.canRead())
+    	  throw new Exception("Can't read directory "+p_oDir.toString());
+      if (! p_oDir.canWrite()) 
+    	  throw new Exception ("Can't write/rename in directory "+p_oDir.toString());
+	} //________________________________
+	
+    private File fileFromString(String file) 
+    {
+        try {	return new File(new URI(file)); } 
+        catch(Exception e) { return new File(file); }
+    } //________________________________
+
+    private class FileEndsWith implements FileFilter
+    {
+      String m_sSuffix;
+      FileEndsWith(String p_sEnd) throws Exception
+      {
+        m_sSuffix = p_sEnd;
+        if (Util.isNullString(m_sSuffix))
+          throw new Exception("Must specify file extension");
+      } //______________________________
+
+      public boolean accept(File p_f)
+      {	return (p_f.isFile())
+        	? p_f.toString().endsWith(m_sSuffix)
+        	: false;
+      } //______________________________
+    } //____________________________________________________
+    
+/**
+ * Default gateway action for files
+ * <p/>It will just drop the file contents into a Message
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+    private static class PackageFileContents
+    {
+    	public Message process (Object obj) throws Exception
+    	{
+    		if (! (obj instanceof File))
+    			throw new Exception ("Object must be instance of File");
+
+    		Message message = MessageFactory.getInstance().getMessage();
+    		message.getBody().setContents(getFileContent((File)obj));    		
+    		return message;
+    	}
+    	
+    	private byte[] getFileContent(File file) throws Exception
+    	{
+    		ByteArrayOutputStream out = new ByteArrayOutputStream();
+    		byte[] ba = new byte[1000];
+    		int iQread;
+
+    		FileInputStream inp = new FileInputStream(file);
+    		while (-1!= (iQread=inp.read(ba)))
+    			if (iQread > 0)
+    				out.write(ba,0,iQread);
+    		inp.close();
+
+    		out.close();
+    		return out.toByteArray();
+    	}
+    } //____________________________________________________
+
+    protected final static Logger _logger = Logger.getLogger(FileGatewayListener.class);
+
+    protected ConfigTree 		_config;
+    protected GatewayListenerController _controller;
+    protected long 				_sleepBetweenPolls;   //  milliseconds
+
+    protected String			_serviceCategory, _serviceName;
+    protected String			_targetServiceCategory ,_targetServiceName;
+    protected EPR				_myEpr		,_targetEpr;
+
+    protected String			_composerName;
+    protected Class 			_composerClass;
+    protected Object			_composer;
+    protected Method			_processMethod;
+    
+    protected Courier			_courier;
+    
+    protected boolean			_deleteAfterOK;
+    protected File				_inputDirectory		,_errorDirectory	,_postProcessDirectory;
+    protected String			_inputSuffix		,_postProcessSuffix
+    							,_workingSuffix		,_errorSuffix
+    							;
+    protected FileFilter		_fileFilter;
+} //____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java	2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java	2006-11-08 00:12:30 UTC (rev 7457)
@@ -33,14 +33,17 @@
 import org.jboss.internal.soa.esb.command.CommandQueue;
 import org.jboss.internal.soa.esb.command.CommandQueueException;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.util.EPRManager;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;
 import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.parameters.ParamRepositoryException;
 import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
 import org.jboss.soa.esb.util.Util;
 import org.xml.sax.SAXException;
 
@@ -104,11 +107,8 @@
 			return m_oException;
 		}
 	};
-
-    private ActionDefinitionFactory actionDefinitionFactory;
-
 	/**
-	 * Package pivate default constructor. 
+	 * Package private default constructor. 
 	 */
 	protected GatewayListenerController() { }
 	
@@ -213,13 +213,6 @@
 		_endTime = (null == sEndT) ? Long.MAX_VALUE : _dateFormat.parse(
 				sEndT).getTime();
 
-        // Read and initialise the action definitions...
-        ConfigTree actionConfig = p_oP.getFirstChild("Actions");
-        if(actionConfig == null) {
-            throw new ConfigurationException("No 'Actions' configuration.");
-        }        
-        actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
-        
 	} // ________________________________
 
     /**
@@ -265,12 +258,8 @@
 		while (endNotRequested()) 
 		{
 			_status = State.Running;
-			for (ConfigTree oCurr : _config.getAllChildren()) {
-				String sClass = oCurr.getAttribute(GATEWAY_CLASS_TAG);
-				if (Util.isNullString(sClass))
-					continue;
-				tryToLaunchGateway(oCurr, sClass);
-			}
+			for (ConfigTree oCurr : _config.getAllChildren())
+				tryToLaunchGateway(oCurr);
 
 			waitForCmdOrSleep();
 
@@ -301,24 +290,30 @@
 				.info("Finishing_____________________________________________________");
 
 		// Close the command queue...
-		try {
-			_commandQueue.close();
+		try 
+		{
+			if (null!=_commandQueue)
+				_commandQueue.close();
 		} catch (CommandQueueException e) {
 			_logger.error("Error closing Command Queue.", e);
 		}
 	} // ________________________________
 
-	private void tryToLaunchGateway(ConfigTree p_oP, String p_sClassName) 
+	private void tryToLaunchGateway(ConfigTree p_oP) 
 	{
+		String sClass = p_oP.getAttribute(GATEWAY_CLASS_TAG);
+		if (Util.isNullString(sClass))
+			return;
+
 		try {
-			Class oListener = Class.forName(p_sClassName);
-			Constructor oConst = oListener.getConstructor(new Class[] {
-					this.getClass(), ConfigTree.class, ActionDefinitionFactory.class });
-			Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
-					p_oP, actionDefinitionFactory });
+			Class oListener = Class.forName(sClass);
+			Constructor oConst = oListener.getConstructor(new Class[]
+					{this.getClass(), ConfigTree.class});
+			Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this, p_oP});
 			new Thread(oRun).start();
 		} catch (Exception e) {
-			_logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+			_logger.error("Cannot launch <" + sClass + ">\n", e);
+			e.printStackTrace();
 		}
 	} // ________________________________
 
@@ -327,6 +322,7 @@
 	} // ________________________________
 
 	private void waitForCmdOrSleep() {
+
 		long lToGo = millisToWait();
 
 		if (null == _commandQueue) {
@@ -334,14 +330,13 @@
 			// No command queue nor topic - Just sleep until time
 			// exhausted, or thread interrupted
 			try {
-				if (lToGo > 0)
-					Thread.sleep(lToGo);
+				while ((lToGo=millisToWait()) > 0)
+					Thread.sleep(500);
 			} catch (InterruptedException e) {
 				_endTime = 0; // mark as end requested and return
 			}
 			return;
 		}
-
 		// Wait for commands until time exhausted or command received
 		// Note that received commands might change time variables (reload/end)
 		// that's why time to go is recalculated on each cycle
@@ -436,6 +431,10 @@
 		return _endRequested || System.currentTimeMillis() >= _endTime;
 	}
 
+	public void requestEnd() {
+		_endRequested=true;
+		_endTime = 0;
+	}
 	/**
 	 * Accessor to determine if execution time is not expired, and no shutdown
 	 * request received
@@ -507,23 +506,72 @@
 		return getEprManager().loadEPR(serviceName);
 	} // ________________________________
 
-	public void register (String serviceName, EPR address)
+
+	/**
+	 * @deprecated use register (ConfigTree config, EPR address) instead.
+	 * @param name
+	 * @param address
+	 */
+	public void register (String name, EPR address)
 	{
-		try { getEprManager().saveEPR(serviceName,address); }
+		try { getEprManager().saveEPR(name,address); }
 		catch (IOException e)
 		{
 			_logger.fatal("Cannot register service",e);
 		}
 	} // ________________________________
-
-	public void unRegister (String serviceName, EPR address)
+	/**
+	 * @deprecated use unRegister (String serviceCategoryName, String serviceName, EPR epr) instead.
+	 * @param name
+	 */
+	public void unRegister (String name)
 	{
-		try { getEprManager().removeEPR(serviceName); }
+		try { getEprManager().removeEPR(name); }
 		catch (IOException e)
 		{
 			_logger.fatal("Cannot un-register service",e);
 		}
 	} // ________________________________
+	/**
+	 * Register an EPR in the registry.
+	 * 
+	 * @param config - a config tree containing the deployment-configuration of the service.
+	 * @param epr - the epr (EndPoint Reference) of the service.
+	 * 
+	 * @throws RegistryException
+	 */
+	public void register(ConfigTree config , EPR epr) throws RegistryException
+	{
+		String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+		String serviceName         = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+		if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+		{
+				register(serviceName,epr);
+				return;
+		}
+		String serviceDescription  = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
+		String eprDescription      = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
+		Registry registry = RegistryFactory.getRegistry();
+		registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
+	}
+	/**
+	 * Unregister the EPR from the registry.
+	 * 
+	 * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+	 * @param serviceName         - name of the service ("
+	 * @param epr
+	 * @throws RegistryException
+	 */
+	public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
+	{
+		if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+		{
+				unRegister(serviceName);
+				return;
+		}
+		Registry registry = RegistryFactory.getRegistry();
+		registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
+	}
 
 
 	private 		CommandQueue _commandQueue;

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2006-11-08 00:12:30 UTC (rev 7457)
@@ -45,6 +45,7 @@
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
 
 public class JmsGatewayListener implements Runnable
 {
@@ -61,7 +62,9 @@
 	public void run() 
 	{
 		if (null!=_serviceName)
-			_controller.register(_serviceName,_myEpr);
+			try { _controller.register(_config,_myEpr); }
+			catch (RegistryException e1) 
+			{	_logger.warn("unable to register service",e1); }
 
 		while (_controller.continueLooping()) 
         {
@@ -108,7 +111,9 @@
         }
         
 		if (null!=_serviceName)
-			_controller.unRegister(_serviceName,_myEpr);
+			try { _controller.unRegister(_serviceCategory, _serviceName,_myEpr); }
+			catch (RegistryException e1){	_logger.warn("unable to unRegister service",e1); }
+		
 
 		if (null != _queueSession) 
             try { _queueSession.close(); }
@@ -128,6 +133,7 @@
     protected void checkMyParms() throws Exception 
     {
         // Third arg is null - Exception will be thrown if attribute is not found
+    	_targetServiceCategory	= _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
     	_targetServiceName	= _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
     	_targetEpr		= _controller.getEprByName(_targetServiceName);
     	if (null==_targetEpr)
@@ -229,7 +235,8 @@
     protected GatewayListenerController _controller;
     protected final long 		_sleepForRetries;   //  milliseconds
 
-    protected String			_serviceName,_targetServiceName;
+    protected String			_serviceCategory	,_serviceName;
+    protected String			_targetServiceCategory	,_targetServiceName;
     protected EPR				_myEpr		,_targetEpr;
 
     protected String			_composerName;

Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java	2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java	2006-11-08 00:12:30 UTC (rev 7457)
@@ -0,0 +1,149 @@
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.tests.BaseTest;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.EsbListenerController;
+import org.jboss.soa.esb.message.Message;
+
+public class FileGatewayListenerUnitTest extends BaseTest 
+{
+
+	private static final String TMP_DIR	= System.getProperty("java.io.tmpdir","/tmp"); 
+
+	static File _gatewayConfig		= new File(TMP_DIR,"gatewayConfig.xml");
+	static File _esbListenerConfig 	= new File(TMP_DIR,"esbListenerConfig.xml");
+	static File _returnFile			= new File(TMP_DIR,"messageBack.txt");
+	static File _droppedFile		= new File(TMP_DIR,"legacy.dat");
+	static String POST_SUFFIX 		= ".sentToEsb";
+	static File _doneFile			= new File(_droppedFile.toString()+POST_SUFFIX);
+
+	
+	static String THE_TEXT = "This is the text that will travel around";
+
+	String _appServer		= System.getProperty("unit.test.appserver","localhost");
+	EsbListenerController	_esbListController;
+	GatewayListenerController _gatewayController;
+
+	public void setUp() throws Exception
+	{
+		Logger.getLogger(this.getClass()).info("Writing temp files to "+TMP_DIR);
+    	writeGatewayConfig();
+    	writeEsbListenerConfig();
+    	bytesToFile(_droppedFile, THE_TEXT.getBytes());
+    	_doneFile.delete();
+    	_returnFile.delete();
+	}
+	
+	public void tearDown() throws Exception
+	{
+    	_gatewayConfig.delete();
+    	_esbListenerConfig.delete();
+    	_doneFile.delete();
+    	_returnFile.delete();
+	}
+	
+	public void test_FileGatewayListener() throws Exception 
+    {
+        try 
+        {
+    		_esbListController = new EsbListenerController(_esbListenerConfig.toString());
+    		new Thread(_esbListController).start();
+    		Thread.sleep(2000);
+    		_gatewayController = new GatewayListenerController(_gatewayConfig.toString());
+    		new Thread(_gatewayController).start();
+    		Thread.sleep(4000);
+
+    		_esbListController.requestEnd();
+    		_gatewayController.requestEnd();    
+    		
+    		assertEquals(THE_TEXT, stringFromFile(_returnFile));
+    		assertEquals(true,_doneFile.exists());
+        } 
+        catch (Exception e) { }
+    }
+	
+	public static class MockAction 
+	{
+	    ConfigTree _config;
+	    public MockAction(ConfigTree config) { _config = config; } 
+	    public Message writeToDisk (Message message) throws Exception 
+	    {
+	    	bytesToFile(_returnFile, message.getBody().getContents());
+	    	return message;
+	    } // ________________________________
+
+	} // ___________________________________________________
+
+	void writeGatewayConfig() throws Exception
+	{
+		StringBuilder sb = new StringBuilder()
+		.append("<FileGateway parameterReloadSecs=\"180\">\n")
+		 .append("  <GatewayConfig\n")
+		 .append("     target-service-category=\"eprManager\"\n")
+		 .append("     target-service-name=\"testService\"\n")
+		 .append("     gatewayClass=\"org.jboss.soa.esb.listeners.gateway.FileGatewayListener\"\n")
+		 .append("     pollLatencySeconds=\"5\"\n")
+		 .append("     inputDir=\"").append(TMP_DIR).append("\"\n")
+		 .append("     inputSuffix=\".dat\"\n")
+		 .append("	   postSuffix=\"").append(POST_SUFFIX).append("\"\n")
+		 .append("   >\n")
+		 .append("   </GatewayConfig>\n")
+		 .append("</FileGateway>\n")
+		 ;
+		bytesToFile(_gatewayConfig, sb.toString().getBytes());
+	}
+	
+	void writeEsbListenerConfig() throws Exception
+	{
+		StringBuilder sb = new StringBuilder()
+		.append("<DummyTester parameterReloadSecs=\"180\">\n")
+		.append("   <DummyActionConfig\n")
+		.append("   service-category=\"eprManager\"\n")
+		.append("	service-name=\"testService\"\n")
+		.append("	service-description=\"My Dummy Service Name\"\n")
+		.append("	epr-description=\"EPR descriptionnnnnnnnnnnnnnn\"\n")		
+		.append("  	listenerClass=\"org.jboss.soa.esb.listeners.message.JmsQueueListener\"\n")
+		.append("	connection-factory=\"ConnectionFactory\"\n")
+		.append("	destination-type=\"queue\"\n")
+		.append("	destination-name=\"queue/A\"\n")
+		.append("	jndi-type=\"jboss\"\n")
+		.append("   jndi-URL=\"").append(_appServer).append("\"\n")
+		.append("   message-selector=\"service='test'\"\n")
+		.append("   >\n")
+		.append("	   <action class=\"").append(MockAction.class.getName()).append("\" process=\"writeToDisk\" />\n")
+		.append("   </DummyActionConfig>\n")
+		.append("</DummyTester>\n")
+		;
+		bytesToFile(_esbListenerConfig, sb.toString().getBytes());
+	}
+	
+	static void bytesToFile(File file, byte[] text) throws Exception
+	{
+		OutputStream out = new FileOutputStream(file);
+		out.write(text);
+		out.close();
+	}
+
+	static String stringFromFile(File file) throws Exception
+	{
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+		FileInputStream	inp = new FileInputStream(file);
+		byte[] ba = new byte[1000];
+		int iQ;
+		while (-1 != (iQ=inp.read(ba)))
+			if (iQ > 0)
+				out.write(ba,0,iQ);
+		inp.close();
+		
+		out.close();
+		return out.toString();
+	}
+}




More information about the jboss-svn-commits mailing list