[jboss-svn-commits] JBL Code SVN: r12088 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners: gateway and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue May 22 18:35:44 EDT 2007
Author: estebanschifman
Date: 2007-05-22 18:35:44 -0400 (Tue, 22 May 2007)
New Revision: 12088
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java
Log:
Add support for 'blocking' or 'two-way' file gateway (local and remote)
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2007-05-22 22:35:44 UTC (rev 12088)
@@ -68,6 +68,8 @@
/** Gateway Composer */
public static final String GATEWAY_COMPOSER_CLASS_TAG = "composer-class";
public static final String GATEWAY_COMPOSER_METHOD_TAG = "composer-process";
+ public static final String GATEWAY_RESPONDER_METHOD_TAG = "responder-process";
+ public static final String GATEWAY_WAIT_MILLIS_TAG = "max-millis-for-response";
/** Routing */
public static final String DESTINATION_NAME_TAG = "destination-name";
/** Content Based Routing */
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-05-22 22:35:44 UTC (rev 12088)
@@ -365,6 +365,43 @@
_logger.debug("No value specified for " + tag + " attribute"
+ " - Using default value: '" + defaultValue + "'");
} // ________________________________
+
+ public static long getMaxMillisGatewayWait(ConfigTree tree, Logger logr, boolean hasResponder)
+ throws ConfigurationException
+ {
+ long maxWait = -1;
+ String sAux = tree.getAttribute(ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG);
+ if (!Util.isNullString(sAux))
+ {
+ try
+ {
+ maxWait = Long.parseLong(sAux);
+ if (maxWait < 1)
+ logr.warn("Value specified for "
+ + ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG
+ + " ("+maxWait
+ +") implies that this will be an 'inbound-only' gateway");
+ else if (! hasResponder)
+ throw new ConfigurationException(
+ "For two-way gateways (max-millis-for-response>0)"
+ +", you must specify the '"
+ +ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG
+ +"' attribute");
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("Invalid value for "
+ +ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG,e);
+ }
+ }
+ else
+ {
+ logr.warn("No value specified for: "
+ + ListenerTagNames.GATEWAY_WAIT_MILLIS_TAG
+ + " - This will be an 'inbound-only' gateway");
+ }
+ return maxWait;
+ } //________________________________
private static final Logger _logger = Logger.getLogger(ListenerUtil.class);
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java 2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java 2007-05-22 22:35:44 UTC (rev 12088)
@@ -40,7 +40,9 @@
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.TwoWayCourier;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -49,6 +51,7 @@
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleThreadState;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.ClassUtil;
import org.jboss.soa.esb.util.Util;
@@ -77,6 +80,8 @@
abstract void getDefaultComposer() throws GatewayException;
+ abstract void bytesToFile(byte[] bytes, File file) throws GatewayException;
+
protected AbstractFileGateway(ConfigTree config) throws ConfigurationException, RegistryException, GatewayException
{
super(config) ;
@@ -110,13 +115,17 @@
*/
protected void doRun()
{
- if (_logger.isDebugEnabled())
- {
+
+ EPR replyEpr = null;
+ Message replyMsg = null;
+
+ if (_logger.isDebugEnabled())
+ {
_logger.debug("run() method of " + this.getClass().getSimpleName()
+ " started on thread " + Thread.currentThread().getName());
- }
+ }
- do
+ do
{
File[] fileList;
try
@@ -159,6 +168,7 @@
continue;
}
boolean bSent = false;
+
Message outMessage = (Message) obj;
outMessage.getProperties().setProperty(ORIGINAL_FILE_NAME_MSG_PROP, fileIn.getName());
for (EPR current : _targetEprs)
@@ -178,20 +188,26 @@
{
_logger.error("Problems with file EPR", e); }
}
-// _courier = CourierFactory.getCourier(current);
_courier = getCourier(current);
- try
- {
- if (_courier.deliver(outMessage))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier) ;
- }
+ try
+ {
+ replyEpr = null;
+ outMessage.getHeader().getCall().setTo(current);
+ if (_maxMillisForResponse>0)
+ {
+ replyEpr = CourierUtil.getDefaultReplyToEpr(current);
+ outMessage.getHeader().getCall().setReplyTo(replyEpr);
+ }
+ if (_courier.deliver(outMessage))
+ {
+ bSent = true;
+ break;
+ }
+ }
+ finally
+ {
+ CourierUtil.cleanCourier(_courier) ;
+ }
}
if (!bSent)
@@ -201,6 +217,31 @@
+ "> is not registered";
thrown = new Exception(text);
}
+ else if (null!=replyEpr)
+ {
+ TwoWayCourier replyCourier = CourierFactory.getPickupCourier(replyEpr);
+ try
+ {
+ replyMsg = replyCourier.pickup(_maxMillisForResponse);
+ _responderMethod.invoke(_composer, new Object[] {replyMsg,fileIn});
+ }
+ catch (CourierTimeoutException e)
+ {
+ thrown = e;
+ text = "Expected response was not received from invoked service";
+ replyMsg = MessageFactory.getInstance().getMessage();
+ String timedOut = "Service <"
+ +_targetServiceCategory+","+"_targetServiceName"
+ +"> timed out without sending response";
+ replyMsg.getBody().setByteArray(timedOut.getBytes());
+ _responderMethod.invoke(_composer, new Object[] {replyMsg,fileIn});
+ }
+ finally
+ {
+ if (null!=replyCourier)
+ CourierUtil.cleanCourier(replyCourier);
+ }
+ }
}
catch (InvocationTargetException e)
{
@@ -385,6 +426,9 @@
resolveComposerClass();
+ boolean hasResponder = _responderMethod!=null;
+ _maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait
+ (_config, _logger, hasResponder);
try
{
// INPUT directory and suffix (used for FileFilter)
@@ -467,9 +511,10 @@
protected void resolveComposerClass() throws ConfigurationException, GatewayException
{
+ String sProcessMethod = null;
+ String sResponderMethod = null;
try
{
- String sProcessMethod = null;
_composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
if (null != _composerName)
{ // class attribute
@@ -478,15 +523,21 @@
{ ConfigTree.class });
_composer = oConst.newInstance(_config);
sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
+ sResponderMethod = _config.getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
}
else
{
getDefaultComposer();
- sProcessMethod = "process";
+ sProcessMethod = "process";
+ sResponderMethod = "respond";
}
- _processMethod = _composerClass.getMethod(sProcessMethod, new Class[]
- { Object.class });
+ _processMethod = _composerClass.getMethod
+ (sProcessMethod, new Class[]{ Object.class });
+
+ _responderMethod = (null==sResponderMethod) ? null
+ : _composerClass.getMethod(sResponderMethod
+ , new Class[] {Message.class, File.class});
}
catch (InvocationTargetException ex)
{
@@ -528,6 +579,7 @@
protected ConfigTree _config;
protected long _sleepBetweenPolls; // milliseconds
+ protected long _maxMillisForResponse;
protected String _targetServiceCategory, _targetServiceName;
@@ -540,6 +592,7 @@
protected Object _composer;
protected Method _processMethod;
+ protected Method _responderMethod;
protected Courier _courier;
@@ -554,4 +607,5 @@
/** Message property name for original filename */
public static final String ORIGINAL_FILE_NAME_MSG_PROP = "org.jboss.soa.esb.gateway.original.file.name";
+
} // ____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java 2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java 2007-05-22 22:35:44 UTC (rev 12088)
@@ -26,9 +26,11 @@
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.FileEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
@@ -173,6 +175,23 @@
+ " - Using default composer class : " + _composerName);
}
+ @Override
+ void bytesToFile(byte[] bytes, File file) throws GatewayException
+ {
+ try
+ {
+ if (file.exists() && !file.delete())
+ throw new GatewayException("Unable to delete existing file "+file);
+ FileOutputStream out = new FileOutputStream(file);
+ out.write(bytes);
+ out.close();
+ }
+ catch (Exception e)
+ {
+ throw new GatewayException(e);
+ }
+ }
+
// ______________________________________________________________________________
/**
* Default gateway action for files <p/>It will just drop the file contents
@@ -199,6 +218,14 @@
{
return getFileContents(file);
}
+
+ public void respond(Message msg, File file) throws Exception
+ {
+ File responseFile = new File(file.getParent(),file.getName()
+ + FileEpr.DEFAULT_REPLY_TO_FILE_SUFFIX+"_gw");
+ bytesToFile(msg.getBody().getByteArray(), responseFile);
+ }
+
} // ____________________________________________________
/**
@@ -281,4 +308,5 @@
private FileFilter _inputFileFilter; // normal file filter
private FileFilter _ignoreFileFilter; // worker file filter (used if input suffix is null)
+
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java 2007-05-22 21:34:34 UTC (rev 12087)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/RemoteGatewayListener.java 2007-05-22 22:35:44 UTC (rev 12088)
@@ -81,6 +81,7 @@
}
+ @Override
protected void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException
{
// TODO: Implement. Very expensive though.
@@ -92,6 +93,7 @@
}
+ @Override
public boolean deleteFile(File file) throws GatewayException
{
RemoteFileSystem rfs = null;
@@ -118,6 +120,7 @@
}
}
+ @Override
byte[] getFileContents(File file) throws GatewayException
{
RemoteFileSystem rfs = null;
@@ -176,6 +179,7 @@
}
}
+ @Override
boolean renameFile(File from, File to) throws GatewayException
{
RemoteFileSystem rfs = null;
@@ -199,4 +203,31 @@
rfs.quit();
}
}
+ @Override
+ void bytesToFile(byte[] bytes, File file) throws GatewayException
+ {
+ RemoteFileSystem rfs = null;
+ try
+ {
+ EPR epr = ListenerUtil.assembleEpr(_config);
+ if (!(epr instanceof FTPEpr))
+ throw new Exception("This Gateway only accepts FTP and SFTP.");
+
+ FTPEpr ftpEpr = (FTPEpr) epr;
+ rfs = RemoteFileSystemFactory.getRemoteFileSystem(ftpEpr, true);
+ rfs.setRemoteDir(FtpClientUtil.fileToFtpString(_inputDirectory));
+
+ File tmpFile = File.createTempFile("RemoteGW", ".forUpload");
+ super.bytesToFile(bytes, tmpFile);
+ rfs.uploadFile(tmpFile, file.getName());
+ tmpFile.delete();
+
+ } catch (Exception e) {
+ throw new GatewayException(e);
+ } finally {
+ if (null != rfs)
+ rfs.quit();
+ }
+ }
+
}
More information about the jboss-svn-commits
mailing list