[jboss-svn-commits] JBL Code SVN: r13808 - labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Jul 27 11:16:12 EDT 2007


Author: tfennelly
Date: 2007-07-27 11:16:11 -0400 (Fri, 27 Jul 2007)
New Revision: 13808

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/XAbstractFileGateway.java
Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
Log:
Formatted - so it can be read.

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2007-07-27 13:32:19 UTC (rev 13807)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2007-07-27 15:16:11 UTC (rev 13808)
@@ -39,274 +39,238 @@
 import org.jboss.soa.esb.util.FileUtil;
 import org.jboss.soa.esb.util.Util;
 
-public class FileGatewayListener extends AbstractFileGateway
-{
+public class FileGatewayListener extends XAbstractFileGateway {
     /**
      * serial version uid for this class
      */
     private static final long serialVersionUID = 8457176329093772449L;
 
-    public FileGatewayListener(ConfigTree config) throws ConfigurationException, RegistryException, GatewayException
-	{
-		super(config);
-		
-		/*
-		 * JBESB-454 allowed gateways to pull files with any extension. Obvious
-		 * security issue, but we are explicit about this in the docs and users
-		 * should beware.
-		 */
-		
-		if ((_inputSuffix == null) || (_inputSuffix.equals("")))
-		{
-		    /*
-		     * If no suffix, then inputDir must be different from outputDir
-		     * or we go into an infinite loop. Already checked through
-		     * checkMyParams.
-		     */
-		
-		    _inputFileFilter = null;
-		    
-		    /*
-		     * If no input suffix, then we must have a work suffix and we
-		     * must ignore worker files when sourcing new input files, or
-		     * we end up in an infinite loop.
-		     */
-		    
-		    _ignoreFileFilter = new IgnoreFile(_workingSuffix, _errorSuffix, _postProcessSuffix);
-		}
-		else
-		    _inputFileFilter = new FileEndsWith(_inputSuffix);
-	}
+    public FileGatewayListener(ConfigTree config) throws ConfigurationException, RegistryException, GatewayException {
+        super(config);
 
-	@Override
-	protected void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException
-	{
-		if (!p_oDir.exists())
-			throw new GatewayException("Directory " + p_oDir.toString()
-					+ " not found");
-		if (!p_oDir.isDirectory())
-			throw new GatewayException(p_oDir.toString()
-					+ " is not a directory");
-		if (!p_oDir.canRead())
-			throw new GatewayException("Can't read directory "
-					+ p_oDir.toString());
-		if (!p_oDir.canWrite())
-			throw new GatewayException("Can't write/rename in directory "
-					+ p_oDir.toString());
-	} // ________________________________
+        /*
+           * JBESB-454 allowed gateways to pull files with any extension. Obvious
+           * security issue, but we are explicit about this in the docs and users
+           * should beware.
+           */
 
-	@Override
-	boolean deleteFile(File file) throws GatewayException
-	{
-		return file.delete();
-	}
+        if ((_inputSuffix == null) || (_inputSuffix.equals(""))) {
+            /*
+                * If no suffix, then inputDir must be different from outputDir
+                * or we go into an infinite loop. Already checked through
+                * checkMyParams.
+                */
 
-	@Override
-	byte[] getFileContents(File file) throws GatewayException
-	{
-		try
-		{
-			ByteArrayOutputStream out = new ByteArrayOutputStream();
-			byte[] ba = new byte[1000];  // TODO magic number
-			int iQread;
-			FileInputStream inp = new FileInputStream(file);
-			while (-1 != (iQread = inp.read(ba)))
-				if (iQread > 0)
-					out.write(ba, 0, iQread);
-			inp.close();
-			out.close();
-			return out.toByteArray();
-		}
-		catch (IOException e)
-		{
-			throw new GatewayException(e);
-		}
-	}
+            _inputFileFilter = null;
 
-	@Override
-	File[] getFileList() throws GatewayException
-	{
-	    /*
-	     * JBESB-454 allows no input suffix. This means that we need to
-	     * ignore any worker files, which are written to the same directory,
-	     * but which have a well defined suffix. All other files (errors and
-	     * post processed) go into separate directories, so we don't need
-	     * to be concerned about them.
-	     */
-	    
-	    if (_inputFileFilter != null)
-	    {
-		/*
-		 * Input suffix is not null.
-		 */
-		
-		return _inputDirectory.listFiles(_inputFileFilter);
-	    }
-	    else
-	    {
-		/*
-		 * Input suffix is null so ignore any worker files.
-		 */
-		
-		return _inputDirectory.listFiles(_ignoreFileFilter);
-	    }
-	}
+            /*
+                * If no input suffix, then we must have a work suffix and we
+                * must ignore worker files when sourcing new input files, or
+                * we end up in an infinite loop.
+                */
 
-	@Override
-	boolean renameFile(File from, File to) throws GatewayException
-	{
-		if (to.exists() && !to.delete())
-		{
-			throw new GatewayException("Cannot delete target file: "
-					+ to.getAbsolutePath());
-		}
+            _ignoreFileFilter = new IgnoreFile(_workingSuffix, _errorSuffix, _postProcessSuffix);
+        } else
+            _inputFileFilter = new FileEndsWith(_inputSuffix);
+    }
 
-		return FileUtil.renameTo(from, to) ;
-	}
+    @Override
+    protected void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException {
+        if (!p_oDir.exists())
+            throw new GatewayException("Directory " + p_oDir.toString()
+                    + " not found");
+        if (!p_oDir.isDirectory())
+            throw new GatewayException(p_oDir.toString()
+                    + " is not a directory");
+        if (!p_oDir.canRead())
+            throw new GatewayException("Can't read directory "
+                    + p_oDir.toString());
+        if (!p_oDir.canWrite())
+            throw new GatewayException("Can't write/rename in directory "
+                    + p_oDir.toString());
+    } // ________________________________
 
-	@Override
-	void getDefaultComposer() throws GatewayException
-	{
-		_composerName = PackageFileContents.class.getName();
-		_composerClass = PackageFileContents.class;
-		_composer = new PackageFileContents();
-		_logger.debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
-				+ "> element found in gateway configuration"
-				+ " -  Using default composer class : " + _composerName);
-	}
+    @Override
+    boolean deleteFile(File file) throws GatewayException {
+        return file.delete();
+    }
 
-	@Override
-	void bytesToFile(byte[] bytes, File file) throws GatewayException 
-	{
-		try
-		{
-			if (file.exists() && !file.delete())
-				throw new GatewayException("Unable to delete existing file "+file);
-			FileOutputStream out = new FileOutputStream(file);
-			out.write(bytes);
-			out.close();
-		}
-		catch (Exception e) 
-		{
-			throw new GatewayException(e);
-		}
-	}
+    @Override
+    byte[] getFileContents(File file) throws GatewayException {
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            byte[] ba = new byte[1000];  // TODO magic number
+            int iQread;
+            FileInputStream inp = new FileInputStream(file);
+            while (-1 != (iQread = inp.read(ba)))
+                if (iQread > 0)
+                    out.write(ba, 0, iQread);
+            inp.close();
+            out.close();
+            return out.toByteArray();
+        }
+        catch (IOException e) {
+            throw new GatewayException(e);
+        }
+    }
 
-	// ______________________________________________________________________________
-	/**
-	 * Default gateway action for files <p/>It will just drop the file contents
-	 * into a Message
-	 * 
-	 * @author <a
-	 *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
-	 * @since Version 4.0
-	 * 
-	 */
-	public class PackageFileContents
-	{
-		public Message process(Object obj) throws ConfigurationException, GatewayException
-		{
-			if (!(obj instanceof File))
-				throw new ConfigurationException("Object must be instance of File");
+    @Override
+    File[] getFileList() throws GatewayException {
+        /*
+           * JBESB-454 allows no input suffix. This means that we need to
+           * ignore any worker files, which are written to the same directory,
+           * but which have a well defined suffix. All other files (errors and
+           * post processed) go into separate directories, so we don't need
+           * to be concerned about them.
+           */
 
-			Message message = MessageFactory.getInstance().getMessage();
-			message.getBody().setByteArray(getFileContent((File) obj));
-			return message;
-		}
+        if (_inputFileFilter != null) {
+            /*
+            * Input suffix is not null.
+            */
 
-		private byte[] getFileContent(File file) throws ConfigurationException, GatewayException
-		{
-			return getFileContents(file);
-		}
+            return _inputDirectory.listFiles(_inputFileFilter);
+        } else {
+            /*
+            * Input suffix is null so ignore any worker files.
+            */
 
-		public void respond(Message msg, File file) throws Exception
-		{
-			File responseFile = new File(file.getParent(),file.getName()
-					+ FileEpr.DEFAULT_REPLY_TO_FILE_SUFFIX+"_gw");
-			bytesToFile(msg.getBody().getByteArray(), responseFile);
-		}
+            return _inputDirectory.listFiles(_ignoreFileFilter);
+        }
+    }
 
-	} // ____________________________________________________
+    @Override
+    boolean renameFile(File from, File to) throws GatewayException {
+        if (to.exists() && !to.delete()) {
+            throw new GatewayException("Cannot delete target file: "
+                    + to.getAbsolutePath());
+        }
 
-	/**
-	 * This implementation allows no input suffix, whereas the base class
-	 * requires an input suffix.
-	 */
-	
-	protected void checkInputSuffix () throws ConfigurationException
-        {
-	    // any suffix is allowed for input; set value to "" if null.
-	    
-	    if (_inputSuffix == null)
-		_inputSuffix = "";
+        return FileUtil.renameTo(from, to);
+    }
+
+    @Override
+    void getDefaultComposer() throws GatewayException {
+        _composerName = PackageFileContents.class.getName();
+        _composerClass = PackageFileContents.class;
+        _composer = new PackageFileContents();
+        _logger.debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
+                + "> element found in gateway configuration"
+                + " -  Using default composer class : " + _composerName);
+    }
+
+    @Override
+    void bytesToFile(byte[] bytes, File file) throws GatewayException {
+        try {
+            if (file.exists() && !file.delete())
+                throw new GatewayException("Unable to delete existing file " + file);
+            FileOutputStream out = new FileOutputStream(file);
+            out.write(bytes);
+            out.close();
         }
-	
-	/**
-	 * Simple file filter for local filesystem Will accept only files that end
-	 * with the String supplied at constructor time
-	 * 
-	 */
-	private class FileEndsWith implements FileFilter
-	{
-		String m_sSuffix;
+        catch (Exception e) {
+            throw new GatewayException(e);
+        }
+    }
 
-		FileEndsWith(String p_sEnd) throws ConfigurationException
-		{
-			m_sSuffix = p_sEnd;
-			if (Util.isNullString(m_sSuffix))
-				throw new ConfigurationException("Must specify file extension");
-		} // ______________________________
+    // ______________________________________________________________________________
+    /**
+     * Default gateway action for files <p/>It will just drop the file contents
+     * into a Message
+     *
+     * @author <a
+     *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+     * @since Version 4.0
+     */
+    public class PackageFileContents {
+        public Message process(Object obj) throws ConfigurationException, GatewayException {
+            if (!(obj instanceof File))
+                throw new ConfigurationException("Object must be instance of File");
 
-		public boolean accept(File p_f)
-		{
-			return (p_f.isFile()) ? p_f.toString().endsWith(m_sSuffix) : false;
-		}
-	}
-	
-	private class IgnoreFile implements FileFilter
-	{
-		String _workSuffix;
-		String _errorSuffix;
-		String _postSuffix;
+            Message message = MessageFactory.getInstance().getMessage();
+            message.getBody().setByteArray(getFileContent((File) obj));
+            return message;
+        }
 
-		IgnoreFile(String workSuffix, String errorSuffix, String postSuffix) throws ConfigurationException
-		{
-			_workSuffix = workSuffix;
-			_errorSuffix = errorSuffix;
-			_postSuffix = postSuffix;
-			
-			if (Util.isNullString(_workSuffix))
-				throw new ConfigurationException("Must specify workSuffix");
-			if (Util.isNullString(_errorSuffix))
-				throw new ConfigurationException("Must specify errorSuffix");
-			if (Util.isNullString(_postSuffix))
-				throw new ConfigurationException("Must specify postProcessSuffix");
-		} // ______________________________
+        private byte[] getFileContent(File file) throws ConfigurationException, GatewayException {
+            return getFileContents(file);
+        }
 
-		public boolean accept(File p_f)
-		{
-		    if (p_f.isFile())
-		    {
-			/*
-			 * If file is a work file then ignore it.
-			 */
-			
-			String fileName = p_f.toString();
-			
-			if (fileName.endsWith(_workSuffix) || fileName.endsWith(_errorSuffix) || fileName.endsWith(_postSuffix))
-			{
-			    return false;
-			}
-			else
-			    return true;
-		    }
-		    else
-			return false;
-		}
-	}
+        public void respond(Message msg, File file) throws Exception {
+            File responseFile = new File(file.getParent(), file.getName()
+                    + FileEpr.DEFAULT_REPLY_TO_FILE_SUFFIX + "_gw");
+            bytesToFile(msg.getBody().getByteArray(), responseFile);
+        }
 
-	private FileFilter _inputFileFilter;  // normal file filter
-	private FileFilter _ignoreFileFilter; // worker file filter (used if input suffix is null)
+    } // ____________________________________________________
 
+    /**
+     * This implementation allows no input suffix, whereas the base class
+     * requires an input suffix.
+     */
 
+    protected void checkInputSuffix() throws ConfigurationException {
+        // any suffix is allowed for input; set value to "" if null.
+
+        if (_inputSuffix == null)
+            _inputSuffix = "";
+    }
+
+    /**
+     * Simple file filter for local filesystem Will accept only files that end
+     * with the String supplied at constructor time
+     */
+    private class FileEndsWith implements FileFilter {
+        String m_sSuffix;
+
+        FileEndsWith(String p_sEnd) throws ConfigurationException {
+            m_sSuffix = p_sEnd;
+            if (Util.isNullString(m_sSuffix))
+                throw new ConfigurationException("Must specify file extension");
+        } // ______________________________
+
+        public boolean accept(File p_f) {
+            return (p_f.isFile()) ? p_f.toString().endsWith(m_sSuffix) : false;
+        }
+    }
+
+    private class IgnoreFile implements FileFilter {
+        String _workSuffix;
+        String _errorSuffix;
+        String _postSuffix;
+
+        IgnoreFile(String workSuffix, String errorSuffix, String postSuffix) throws ConfigurationException {
+            _workSuffix = workSuffix;
+            _errorSuffix = errorSuffix;
+            _postSuffix = postSuffix;
+
+            if (Util.isNullString(_workSuffix))
+                throw new ConfigurationException("Must specify workSuffix");
+            if (Util.isNullString(_errorSuffix))
+                throw new ConfigurationException("Must specify errorSuffix");
+            if (Util.isNullString(_postSuffix))
+                throw new ConfigurationException("Must specify postProcessSuffix");
+        } // ______________________________
+
+        public boolean accept(File p_f) {
+            if (p_f.isFile()) {
+                /*
+                 * If file is a work file then ignore it.
+                 */
+
+                String fileName = p_f.toString();
+
+                if (fileName.endsWith(_workSuffix) || fileName.endsWith(_errorSuffix) || fileName.endsWith(_postSuffix)) {
+                    return false;
+                } else
+                    return true;
+            } else
+                return false;
+        }
+    }
+
+    private FileFilter _inputFileFilter;  // normal file filter
+    private FileFilter _ignoreFileFilter; // worker file filter (used if input suffix is null)
+
+
 }

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/XAbstractFileGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/XAbstractFileGateway.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/XAbstractFileGateway.java	2007-07-27 15:16:11 UTC (rev 13808)
@@ -0,0 +1,542 @@
+package org.jboss.soa.esb.listeners.gateway;
+
+import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleThreadState;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.util.Util;
+import org.jboss.soa.esb.util.ClassUtil;
+import org.jboss.soa.esb.couriers.*;
+import org.jboss.soa.esb.filter.FilterManager;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.net.URI;
+
+/**
+ * Base class for all file gateways: local filesystem, ftp, sftp and ftps.
+ * <p/>Implementations for file manipulation (getFileList, getFileContents,
+ * renameFile and deleteFile) must be provided by factory
+ *
+ * @author <a
+ *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+public abstract class XAbstractFileGateway extends
+        AbstractThreadedManagedLifecycle {
+    abstract File[] getFileList() throws GatewayException;
+
+    abstract byte[] getFileContents(File file) throws GatewayException;
+
+    abstract boolean renameFile(File from, File to) throws GatewayException;
+
+    abstract boolean deleteFile(File file) throws GatewayException;
+
+    abstract void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException;
+
+    abstract void getDefaultComposer() throws GatewayException;
+
+    abstract void bytesToFile(byte[] bytes, File file) throws GatewayException;
+
+    protected XAbstractFileGateway(ConfigTree config)
+            throws ConfigurationException, RegistryException, GatewayException {
+        super(config);
+        _config = config;
+        _sleepBetweenPolls = 10000; // milliseconds
+        checkMyParms();
+    } // __________________________________
+
+    /**
+     * Handle the initialisation of the managed instance.
+     *
+     * @throws org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException
+     *          for errors while initialisation.
+     */
+    protected void doInitialise() throws ManagedLifecycleException {
+        try {
+            _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+                    _targetServiceName);
+            if (null == _targetEprs || _targetEprs.size() < 1)
+                throw new ManagedLifecycleException("EPR <"
+                        + _targetServiceName + "> not found in registry");
+        }
+        catch (final RegistryException re) {
+            throw new ManagedLifecycleException(
+                    "Unexpected registry exception", re);
+        }
+    }
+
+    /**
+     * Execute on the thread.
+     */
+    protected void doRun() {
+
+        EPR replyEpr = null;
+        Message replyMsg = null;
+
+        if (_logger.isDebugEnabled()) {
+            _logger.debug("run() method of " + this.getClass().getSimpleName()
+                    + " started on thread " + Thread.currentThread().getName());
+        }
+
+        do {
+            File[] fileList;
+            try {
+                fileList = getFileList();
+            }
+            catch (GatewayException e) {
+                _logger.error("Can't retrieve file list", e);
+                continue;
+            }
+
+            for (File fileIn : fileList) {
+                // Try to rename - if unsuccessful, somebody else got it first
+                File fileWork = getWorkFileName(fileIn, _workingSuffix);
+                try {
+                    if (!renameFile(fileIn, fileWork))
+                        continue;
+                }
+                catch (GatewayException e) {
+                    _logger.error("Problems renaming file " + fileIn + " to "
+                            + fileWork);
+                    continue;
+                }
+
+                Throwable thrown = null;
+                String text = null;
+                try {
+                    Object obj = _processMethod.invoke(_composer, new Object[]
+                            {fileWork});
+                    if (null == obj) {
+                        _logger.warn("Action class method <"
+                                + _processMethod.getName()
+                                + "> returned a null object");
+                        continue;
+                    }
+                    boolean bSent = false;
+
+                    Message outMessage = (Message) obj;
+                    Map<String, Object> params = new HashMap<String, Object>();
+
+                    params.put(Environment.ORIGINAL_FILE, fileIn);
+                    params.put(Environment.GATEWAY_CONFIG, _config);
+
+                    outMessage = FilterManager.getInstance().doOutputWork(outMessage, params);
+
+                    for (EPR current : _targetEprs) {
+                        _courier = getCourier(current);
+                        try {
+                            replyEpr = null;
+                            outMessage.getHeader().getCall().setTo(current);
+                            if (_maxMillisForResponse > 0) {
+                                replyEpr = CourierUtil
+                                        .getDefaultReplyToEpr(current);
+                                outMessage.getHeader().getCall().setReplyTo(
+                                        replyEpr);
+                            }
+                            if (_courier.deliver(outMessage)) {
+                                bSent = true;
+                                break;
+                            }
+                        }
+                        finally {
+                            CourierUtil.cleanCourier(_courier);
+                        }
+
+                    }
+                    if (!bSent) {
+                        text = "Target service <" + _targetServiceCategory
+                                + "," + _targetServiceName
+                                + "> is not registered";
+                        thrown = new Exception(text);
+                    } else if (null != replyEpr) {
+                        TwoWayCourier replyCourier = CourierFactory
+                                .getPickupCourier(replyEpr);
+                        try {
+                            replyMsg = replyCourier
+                                    .pickup(_maxMillisForResponse);
+                            _responderMethod.invoke(_composer, new Object[]
+                                    {replyMsg, fileIn});
+                        }
+                        catch (CourierTimeoutException e) {
+                            thrown = e;
+                            text = "Expected response was not received from invoked service";
+                            replyMsg = MessageFactory.getInstance()
+                                    .getMessage();
+                            String timedOut = "Service <"
+                                    + _targetServiceCategory + ","
+                                    + "_targetServiceName"
+                                    + "> timed out without sending response";
+                            replyMsg.getBody()
+                                    .setByteArray(timedOut.getBytes());
+                            _responderMethod.invoke(_composer, new Object[]
+                                    {replyMsg, fileIn});
+                        }
+                        finally {
+                            if (null != replyCourier)
+                                CourierUtil.cleanCourier(replyCourier);
+                        }
+                    }
+                }
+                catch (InvocationTargetException e) {
+                    thrown = e;
+                    text = "Problems invoking method <"
+                            + _processMethod.getName() + ">";
+
+                }
+                catch (IllegalAccessException e) {
+                    thrown = e;
+                    text = "Problems invoking method <"
+                            + _processMethod.getName() + ">";
+                }
+                catch (ClassCastException e) {
+                    thrown = e;
+                    text = "Action class method <" + _processMethod.getName()
+                            + "> returned a non Message object";
+                }
+                catch (CourierException e) {
+                    thrown = e;
+                    if (null != _courier)
+                        text = "Courier <" + _courier.getClass().getName()
+                                + ".deliverAsync(Message) FAILED";
+                    else
+                        text = "NULL courier can't deliverAsync Message";
+                }
+                catch (MalformedEPRException e) {
+                    thrown = e;
+                    if (null != _courier)
+                        text = "Courier <"
+                                + _courier.getClass().getName()
+                                + ".deliverAsync(Message) FAILED with malformed EPR.";
+                    else
+                        text = "NULL courier can't deliverAsync Message";
+                }
+
+                if (null == thrown) {
+                    File fileOK = new File(_postProcessDirectory, fileIn
+                            .getName()
+                            + _postProcessSuffix);
+                    if (_deleteAfterOK) {
+                        try {
+                            deleteFile(fileWork);
+                        }
+                        catch (GatewayException e) {
+                            _logger
+                                    .error(
+                                            "File "
+                                                    + fileIn
+                                                    + " has been processed and renamed to "
+                                                    + fileWork
+                                                    + ", but there were problems deleting it from the input directory ",
+                                            e);
+                        }
+                    } else {
+                        try {
+                            renameFile(fileWork, fileOK);
+                        }
+                        catch (GatewayException e) {
+                            _logger
+                                    .error(
+                                            "File "
+                                                    + fileIn
+                                                    + " has been processed and renamed to "
+                                                    + fileWork
+                                                    + ", but there were problems renaming it to "
+                                                    + fileOK, e);
+                        }
+                    }
+                } else {
+                    thrown.printStackTrace();
+                    _logger.error(text, thrown);
+                    File fileError = new File(_errorDirectory, fileIn.getName()
+                            + _errorSuffix);
+                    try {
+                        deleteFile(fileError);
+                    }
+                    catch (GatewayException e) {
+                        _logger.warn("File : " + fileError + " did not exist.");
+                    }
+                    try {
+                        renameFile(fileWork, fileError);
+                    }
+                    catch (GatewayException e) {
+                        _logger.error("Problems renaming file " + fileWork
+                                + " to " + fileError, e);
+                    }
+                }
+            }
+        }
+        while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
+                _sleepBetweenPolls));
+
+        if (_logger.isDebugEnabled()) {
+            _logger
+                    .debug("run() method of " + this.getClass().getSimpleName()
+                            + " finished on thread "
+                            + Thread.currentThread().getName());
+        }
+    } // ________________________________
+
+    protected File getWorkFileName(File fileIn, String suffix) {
+        return new File(fileIn.toString() + _workingSuffix);
+    }
+
+    /*
+         * Extracted to simplify testing
+         */
+    protected Courier getCourier(EPR current) throws CourierException,
+            MalformedEPRException {
+        return CourierFactory.getCourier(current);
+    }
+
+    /**
+     * Handle the threaded destroy of the managed instance.
+     *
+     * @throws org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException
+     *          for errors while destroying.
+     */
+    protected void doThreadedDestroy() throws ManagedLifecycleException {
+    }
+
+    /*
+         * Is the input suffix valid for this type of gateway?
+         */
+
+    protected void checkInputSuffix() throws ConfigurationException {
+        if (_inputSuffix.length() < 1)
+            throw new ConfigurationException("Invalid "
+                    + ListenerTagNames.FILE_INPUT_SFX_TAG + " attribute");
+    }
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     *
+     * @throws org.jboss.soa.esb.ConfigurationException
+     *          -
+     *          if mandatory atts are not right or actionClass not in
+     *          classpath
+     */
+    private void checkMyParms() throws ConfigurationException,
+            RegistryException, GatewayException {
+        // Third arg is null - Exception will be thrown if attribute is not
+        // found
+        _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+                ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+        _targetServiceName = ListenerUtil.obtainAtt(_config,
+                ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+
+        // Polling interval
+        String sAux = _config
+                .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+
+        if (!Util.isNullString(sAux)) {
+            try {
+                _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+            }
+            catch (NumberFormatException e) {
+                _logger.warn("Invalid poll latency - keeping default of "
+                        + (_sleepBetweenPolls / 1000));
+            }
+        } else {
+            _logger.warn("No value specified for: "
+                    + ListenerTagNames.POLL_LATENCY_SECS_TAG
+                    + " -  Using default of " + (_sleepBetweenPolls / 1000));
+        }
+
+        resolveComposerClass();
+
+        boolean hasResponder = _responderMethod != null;
+        _maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait(_config,
+                _logger, hasResponder);
+        try {
+            // INPUT directory and suffix (used for FileFilter)
+            String url = _config.getAttribute(ListenerTagNames.URL_TAG);
+            String sInpDir = (null != url) ? new URL(url).getFile()
+                    : ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_INPUT_DIR_TAG, null);
+            _inputDirectory = fileFromString(sInpDir);
+            seeIfOkToWorkOnDir(_inputDirectory);
+
+            _inputSuffix = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_INPUT_SFX_TAG, null);
+            _inputSuffix = _inputSuffix.trim();
+
+            checkInputSuffix();
+
+            // WORK suffix (will rename in input directory)
+            _workingSuffix = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_WORK_SFX_TAG, ".esbWork").trim();
+            if (_workingSuffix.length() < 1)
+                throw new ConfigurationException("Invalid "
+                        + ListenerTagNames.FILE_WORK_SFX_TAG + " attribute");
+
+            if (_inputSuffix.equals(_workingSuffix))
+                throw new ConfigurationException(
+                        "Work suffix must differ from input suffix <"
+                                + _workingSuffix + ">");
+
+            // ERROR directory and suffix (defaults to input dir and
+            // ".esbError"
+            // suffix)
+            String sErrDir = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_ERROR_DIR_TAG, sInpDir);
+            _errorDirectory = fileFromString(sErrDir);
+            seeIfOkToWorkOnDir(_errorDirectory);
+
+            _errorSuffix = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_ERROR_SFX_TAG, ".esbError").trim();
+            if (_errorSuffix.length() < 1)
+                throw new ConfigurationException("Invalid "
+                        + ListenerTagNames.FILE_ERROR_SFX_TAG + " attribute");
+            if (_errorDirectory.equals(_inputDirectory)
+                    && _inputSuffix.equals(_errorSuffix))
+                throw new ConfigurationException(
+                        "Error suffix must differ from input suffix <"
+                                + _errorSuffix + ">");
+
+            // Do users wish to delete files that were processed OK ?
+            String sPostDel = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_POST_DEL_TAG, "false").trim();
+            _deleteAfterOK = Boolean.parseBoolean(sPostDel);
+            if (_deleteAfterOK)
+                return;
+
+            // POST (done) directory and suffix (defaults to input dir and
+            // ".esbDone" suffix)
+            String sPostDir = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_POST_DIR_TAG, sInpDir);
+            _postProcessDirectory = fileFromString(sPostDir);
+            seeIfOkToWorkOnDir(_postProcessDirectory);
+            _postProcessSuffix = ListenerUtil.obtainAtt(_config,
+                    ListenerTagNames.FILE_POST_SFX_TAG, ".esbDone").trim();
+
+            if (_postProcessDirectory.equals(_inputDirectory)) {
+                if (_postProcessSuffix.length() < 1)
+                    throw new ConfigurationException("Invalid "
+                            + ListenerTagNames.FILE_POST_SFX_TAG + " attribute");
+                if (_postProcessSuffix.equals(_inputSuffix))
+                    throw new ConfigurationException(
+                            "Post process suffix must differ from input suffix <"
+                                    + _postProcessSuffix + ">");
+            }
+        }
+        catch (GatewayException ex) {
+            throw ex;
+        }
+        catch (MalformedURLException ex) {
+            throw new ConfigurationException(ex);
+        }
+    } // ________________________________
+
+    protected void resolveComposerClass() throws ConfigurationException,
+            GatewayException {
+        String sProcessMethod = null;
+        String sResponderMethod = null;
+        try {
+            _composerName = _config
+                    .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+            if (null != _composerName) { // class attribute
+                _composerClass = ClassUtil.forName(_composerName, getClass());
+                Constructor oConst = _composerClass.getConstructor(new Class[]
+                        {ConfigTree.class});
+                _composer = oConst.newInstance(_config);
+                sProcessMethod = _config
+                        .getAttribute(
+                                ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+                                "process");
+                sResponderMethod = _config
+                        .getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
+            } else {
+                getDefaultComposer();
+                sProcessMethod = "process";
+                sResponderMethod = "respond";
+            }
+
+            _processMethod = _composerClass.getMethod(sProcessMethod,
+                    new Class[]
+                            {Object.class});
+
+            _responderMethod = (null == sResponderMethod) ? null
+                    : _composerClass.getMethod(sResponderMethod, new Class[]
+                    {Message.class, File.class});
+        }
+        catch (InvocationTargetException ex) {
+            throw new ConfigurationException(ex);
+        }
+        catch (IllegalAccessException ex) {
+            throw new ConfigurationException(ex);
+        }
+        catch (InstantiationException ex) {
+            throw new ConfigurationException(ex);
+        }
+        catch (NoSuchMethodException ex) {
+            throw new ConfigurationException(ex);
+        }
+        catch (ClassNotFoundException ex) {
+            throw new ConfigurationException(ex);
+        }
+    } // ________________________________
+
+    private File fileFromString(String file) {
+        try {
+            return new File(new URI(file));
+        }
+        catch (Exception e) {
+            return new File(file);
+        }
+    } // ________________________________
+
+    protected final static Logger _logger = Logger
+            .getLogger(XAbstractFileGateway.class);
+
+    protected ConfigTree _config;
+
+    protected long _sleepBetweenPolls; // milliseconds
+
+    protected long _maxMillisForResponse;
+
+    protected String _targetServiceCategory, _targetServiceName;
+
+    protected Collection<EPR> _targetEprs;
+
+    protected String _composerName;
+
+    protected Class _composerClass;
+
+    protected Object _composer;
+
+    protected Method _processMethod;
+
+    protected Method _responderMethod;
+
+    protected Courier _courier;
+
+    protected boolean _deleteAfterOK;
+
+    protected File _inputDirectory, _errorDirectory, _postProcessDirectory;
+
+    protected String _inputSuffix, _postProcessSuffix, _workingSuffix,
+            _errorSuffix;
+
+    protected FileFilter _fileFilter;
+
+} // ____________________________________________________________________________


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/XAbstractFileGateway.java
___________________________________________________________________
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list