[jboss-svn-commits] JBL Code SVN: r12088 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners: gateway and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue May 22 18:35:44 EDT 2007


Author: estebanschifman
Date: 2007-05-22 18:35:44 -0400 (Tue, 22 May 2007)
New Revision: 12088

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java
Log:
Add support for 'blocking' or 'two-way' file gateway (local and remote)

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2007-05-22 22:35:44 UTC (rev 12088)
@@ -68,6 +68,8 @@
     /** Gateway Composer */
     public static final String GATEWAY_COMPOSER_CLASS_TAG   = "composer-class";
     public static final String GATEWAY_COMPOSER_METHOD_TAG  = "composer-process";
+    public static final String GATEWAY_RESPONDER_METHOD_TAG = "responder-process";
+    public static final String GATEWAY_WAIT_MILLIS_TAG 		= "max-millis-for-response";
     /** Routing */
     public static final String DESTINATION_NAME_TAG         = "destination-name";
     /** Content Based Routing */

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-05-22 22:35:44 UTC (rev 12088)
@@ -365,6 +365,43 @@
 			_logger.debug("No value specified for " + tag + " attribute"
 					+ " -  Using default value: '" + defaultValue + "'");
 	} // ________________________________
+	
+	public static long getMaxMillisGatewayWait(ConfigTree tree, Logger logr, boolean hasResponder)
+		throws ConfigurationException
+	{
+		long maxWait = -1;
+		String sAux = tree.getAttribute(ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG);
+		if (!Util.isNullString(sAux))
+		{
+			try
+			{
+				maxWait = Long.parseLong(sAux);
+				if (maxWait < 1)
+					logr.warn("Value specified for "
+							+ ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG
+							+ " ("+maxWait
+							+") implies that this will be an 'inbound-only' gateway");
+				else if (! hasResponder)
+					throw new ConfigurationException(
+							"For two-way gateways (max-millis-for-response>0)"
+							+", you must specify the '"
+							+ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG
+							+"' attribute");
+			}
+			catch (NumberFormatException e)
+			{
+				throw new ConfigurationException("Invalid value for "
+						+ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG,e);
+			}
+		}
+		else
+		{
+			logr.warn("No value specified for: "
+					+ ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG
+					+ " -  This will be an 'inbound-only' gateway");
+		}
+		return maxWait;
+	} //________________________________
 
 	private static final Logger _logger = Logger.getLogger(ListenerUtil.class);
 }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java	2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java	2007-05-22 22:35:44 UTC (rev 12088)
@@ -40,7 +40,9 @@
 import org.jboss.soa.esb.couriers.Courier;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.TwoWayCourier;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -49,6 +51,7 @@
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleThreadState;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.ClassUtil;
 import org.jboss.soa.esb.util.Util;
@@ -77,6 +80,8 @@
 
 	abstract void getDefaultComposer() throws GatewayException;
 
+	abstract void bytesToFile(byte[] bytes, File file) throws GatewayException;
+
 	protected AbstractFileGateway(ConfigTree config) throws ConfigurationException, RegistryException, GatewayException
 	{
             super(config) ;
@@ -110,13 +115,17 @@
          */
         protected void doRun()
         {
-                if (_logger.isDebugEnabled())
-                {
+
+        	EPR 	replyEpr = null;
+        	Message replyMsg = null;
+        	
+        	if (_logger.isDebugEnabled())
+        	{
         		_logger.debug("run() method of " + this.getClass().getSimpleName()
     				+ " started on thread " + Thread.currentThread().getName());
-                }
+            }
 
-                do
+        do
 		{
 			File[] fileList;
 			try
@@ -159,6 +168,7 @@
 						continue;
 					}
 					boolean bSent = false;
+
                 	Message outMessage = (Message) obj;
                 	outMessage.getProperties().setProperty(ORIGINAL_FILE_NAME_MSG_PROP, fileIn.getName());
 					for (EPR current : _targetEprs)
@@ -178,20 +188,26 @@
 							{
 								_logger.error("Problems with file EPR", e);							}
 						}
-//						_courier = CourierFactory.getCourier(current);
 						_courier = getCourier(current);
-                                                try
-                                                {
-                                                    if (_courier.deliver(outMessage))
-                                                    {
-                                                    	bSent = true;
-                                                    	break;
-                                                    }
-                                                }
-                                                finally
-                                                {
-                                                    CourierUtil.cleanCourier(_courier) ;
-                                                }
+                        try
+                        {
+                        	replyEpr = null;
+                        	outMessage.getHeader().getCall().setTo(current);
+                        	if (_maxMillisForResponse>0)
+                        	{
+                        		replyEpr = CourierUtil.getDefaultReplyToEpr(current);
+                        		outMessage.getHeader().getCall().setReplyTo(replyEpr);
+                        	}
+                            if (_courier.deliver(outMessage))
+                            {
+                            	bSent = true;
+                            	break;
+                            }
+                        }
+                        finally
+                        {
+                            CourierUtil.cleanCourier(_courier) ;
+                        }
                                                 
 					}
 					if (!bSent)
@@ -201,6 +217,31 @@
 								+ "> is not registered";
 						thrown = new Exception(text);
 					}
+					else if (null!=replyEpr)
+					{
+						TwoWayCourier replyCourier = CourierFactory.getPickupCourier(replyEpr);
+						try
+						{
+							replyMsg = replyCourier.pickup(_maxMillisForResponse);
+							_responderMethod.invoke(_composer, new Object[] {replyMsg,fileIn});
+						}
+						catch (CourierTimeoutException e)
+						{
+							thrown = e;
+							text = "Expected response was not received from invoked service";
+							replyMsg = MessageFactory.getInstance().getMessage();
+							String timedOut = "Service <"
+								+_targetServiceCategory+","+"_targetServiceName"
+								+"> timed out without sending response";
+							replyMsg.getBody().setByteArray(timedOut.getBytes());
+							_responderMethod.invoke(_composer, new Object[] {replyMsg,fileIn});
+						}
+						finally
+						{
+							if (null!=replyCourier)
+								CourierUtil.cleanCourier(replyCourier);
+						}
+					}
 				}
 				catch (InvocationTargetException e)
 				{
@@ -385,6 +426,9 @@
 
 		resolveComposerClass();
 
+		boolean hasResponder = _responderMethod!=null; 
+		_maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait
+			(_config, _logger, hasResponder);
 		try
 		{
 			// INPUT directory and suffix (used for FileFilter)
@@ -467,9 +511,10 @@
 
 	protected void resolveComposerClass() throws ConfigurationException, GatewayException
 	{
+		String sProcessMethod 	= null;
+		String sResponderMethod = null;
 		try
 		{
-			String sProcessMethod = null;
 			_composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
 			if (null != _composerName)
 			{ // class attribute
@@ -478,15 +523,21 @@
 				{ ConfigTree.class });
 				_composer = oConst.newInstance(_config);
 				sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
+				sResponderMethod = _config.getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
 			}
 			else
 			{
 				getDefaultComposer();
-				sProcessMethod = "process";
+				sProcessMethod 	 = "process";
+				sResponderMethod = "respond";
 			}
 	
-			_processMethod = _composerClass.getMethod(sProcessMethod, new Class[]
-			{ Object.class });
+			_processMethod = _composerClass.getMethod
+				(sProcessMethod, new Class[]{ Object.class });
+
+			_responderMethod = (null==sResponderMethod) ? null
+				: _composerClass.getMethod(sResponderMethod
+						, new Class[] {Message.class, File.class});
 		}
 		catch (InvocationTargetException ex)
 		{
@@ -528,6 +579,7 @@
 	protected ConfigTree _config;
 
 	protected long _sleepBetweenPolls; // milliseconds
+	protected long _maxMillisForResponse;
 
 	protected String _targetServiceCategory, _targetServiceName;
 
@@ -540,6 +592,7 @@
 	protected Object _composer;
 
 	protected Method _processMethod;
+	protected Method _responderMethod;
 
 	protected Courier _courier;
 
@@ -554,4 +607,5 @@
 	
 	/** Message property name for original filename */
 	public static final String ORIGINAL_FILE_NAME_MSG_PROP = "org.jboss.soa.esb.gateway.original.file.name";
+	
 } // ____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java	2007-05-22 22:35:44 UTC (rev 12088)
@@ -26,9 +26,11 @@
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.FileEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
@@ -173,6 +175,23 @@
 				+ " -  Using default composer class : " + _composerName);
 	}
 
+	@Override
+	void bytesToFile(byte[] bytes, File file) throws GatewayException 
+	{
+		try
+		{
+			if (file.exists() && !file.delete())
+				throw new GatewayException("Unable to delete existing file "+file);
+			FileOutputStream out = new FileOutputStream(file);
+			out.write(bytes);
+			out.close();
+		}
+		catch (Exception e) 
+		{
+			throw new GatewayException(e);
+		}
+	}
+
 	// ______________________________________________________________________________
 	/**
 	 * Default gateway action for files <p/>It will just drop the file contents
@@ -199,6 +218,14 @@
 		{
 			return getFileContents(file);
 		}
+
+		public void respond(Message msg, File file) throws Exception
+		{
+			File responseFile = new File(file.getParent(),file.getName()
+					+ FileEpr.DEFAULT_REPLY_TO_FILE_SUFFIX+"_gw");
+			bytesToFile(msg.getBody().getByteArray(), responseFile);
+		}
+
 	} // ____________________________________________________
 
 	/**
@@ -281,4 +308,5 @@
 	private FileFilter _inputFileFilter;  // normal file filter
 	private FileFilter _ignoreFileFilter; // worker file filter (used if input suffix is null)
 
+
 }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java	2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java	2007-05-22 22:35:44 UTC (rev 12088)
@@ -81,6 +81,7 @@
 		
 	}
 
+	@Override
 	protected void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException
 	{
 		// TODO: Implement. Very expensive though.
@@ -92,6 +93,7 @@
 
 	}
 
+	@Override
 	public boolean deleteFile(File file) throws GatewayException
 	{
 		RemoteFileSystem rfs = null;
@@ -118,6 +120,7 @@
 		}
 	}
 
+	@Override
 	byte[] getFileContents(File file) throws GatewayException
 	{
 		RemoteFileSystem rfs = null;
@@ -176,6 +179,7 @@
 		}
 	}
 
+	@Override
 	boolean renameFile(File from, File to) throws GatewayException
 	{
 		RemoteFileSystem rfs = null;
@@ -199,4 +203,31 @@
 				rfs.quit();
 		}
 	}
+	@Override
+	void bytesToFile(byte[] bytes, File file) throws GatewayException 
+	{
+		RemoteFileSystem rfs = null;
+		try
+		{
+			EPR epr = ListenerUtil.assembleEpr(_config);
+			if (!(epr instanceof FTPEpr))  
+				throw new Exception("This Gateway only accepts FTP and SFTP.");
+
+			FTPEpr ftpEpr = (FTPEpr) epr;
+			rfs = RemoteFileSystemFactory.getRemoteFileSystem(ftpEpr, true);
+			rfs.setRemoteDir(FtpClientUtil.fileToFtpString(_inputDirectory));
+
+			File tmpFile = File.createTempFile("RemoteGW", ".forUpload");
+			super.bytesToFile(bytes, tmpFile);
+			rfs.uploadFile(tmpFile, file.getName());
+			tmpFile.delete();
+
+		} catch (Exception e) {
+			throw new GatewayException(e);
+		} finally {
+			if (null != rfs)
+				rfs.quit();
+		}
+	}
+
 }




More information about the jboss-svn-commits mailing list