[jboss-svn-commits] JBL Code SVN: r7457 - in labs/jbossesb/trunk/product/core/listeners: src/org/jboss/soa/esb/listeners src/org/jboss/soa/esb/listeners/gateway tests/src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb/listeners/gateway
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Nov 7 19:12:37 EST 2006
Author: estebanschifman
Date: 2006-11-07 19:12:30 -0500 (Tue, 07 Nov 2006)
New Revision: 7457
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
New FileGatewayListener class and unit test
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2006-11-08 00:12:30 UTC (rev 7457)
@@ -9,6 +9,7 @@
public static final String EPR_DESCRIPTION_TAG = "epr-description";
/** Gateways */
+ public static final String TARGET_SERVICE_CATEGORY_TAG = "target-service-category";
public static final String TARGET_SERVICE_NAME_TAG = "target-service-name";
/** ActionProcessingPipeline */
@@ -26,6 +27,9 @@
/** RuleSet reference (Filename) */
public static final String RULE_SET_TAG = "ruleSet";
+
+ /** Poller tagnames */
+ public static final String POLL_LATENCY_SECS_TAG = "pollLatencySeconds";
/** File Actions */
public static final String FILE_INPUT_DIR_TAG = "inputDir";
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java 2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListener.java 2006-11-08 00:12:30 UTC (rev 7457)
@@ -0,0 +1,346 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+import org.jboss.soa.esb.ConfigurationException;
+
+/**
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+public class FileGatewayListener implements Runnable
+{
+
+ public FileGatewayListener(GatewayListenerController commandListener, ConfigTree config)
+ throws Exception
+ {
+ _config = config;
+ _controller = commandListener;
+ _sleepBetweenPolls = 10000; // milliseconds
+ checkMyParms();
+ } // __________________________________
+
+ public void run()
+ {
+ if (null!=_serviceName)
+ try { _controller.register(_config,_myEpr); }
+ catch (RegistryException e1)
+ { _logger.warn("unable to register service",e1); }
+
+ boolean bSleep = false;
+ while (_controller.continueLooping())
+ {
+ // only sleep in between
+ if (bSleep)
+ try { Thread.sleep(_sleepBetweenPolls); }
+ catch (InterruptedException e) { return; }
+ else
+ bSleep = true;
+
+ for (File fileIn : _inputDirectory.listFiles(_fileFilter))
+ {
+ // Try to rename - if unsuccessful, somebody else got it first
+ File fileWork = new File(fileIn.getParent(),fileIn.getName()+_workingSuffix);
+ if (! fileIn.renameTo(fileWork))
+ continue;
+
+ Throwable thrown = null;
+ String text = null;
+ try
+ {
+ Object obj = _processMethod.invoke(_composer,new Object[] {fileWork} );
+ if (null==obj)
+ {
+ _logger.warn("Action class method <"+_processMethod.getName()+"> returned a null object");
+ continue;
+ }
+ _courier.deliver((org.jboss.soa.esb.message.Message)obj);
+ }
+
+ catch (InvocationTargetException e)
+ {
+ thrown = e;
+ text = "Problems invoking method <"+_processMethod.getName()+">";
+
+ }
+ catch (IllegalAccessException e)
+ {
+ thrown = e;
+ text = "Problems invoking method <"+_processMethod.getName()+">";
+ }
+ catch (ClassCastException e)
+ {
+ thrown = e;
+ text = "Action class method <"+_processMethod.getName()+"> returned a non Message object";
+ }
+ catch (CourierException e)
+ {
+ thrown = e;
+ text = "Courier <"+_courier.getClass().getName()+".deliver(Message) FAILED";
+ }
+
+ if (null==thrown)
+ {
+ File fileOK = new File(_postProcessDirectory,fileIn.getName()+_postProcessSuffix);
+ if (_deleteAfterOK)
+ fileWork.delete();
+ else
+ {
+ fileOK.delete();
+ fileWork.renameTo(fileOK);
+ }
+ }
+ else
+ {
+ thrown.printStackTrace();
+ _logger.error(text,thrown);
+ File fileError = new File(_errorDirectory,fileIn.getName()+_errorSuffix);
+ fileWork.renameTo(fileError);
+ }
+ }
+ }
+
+ if (null!=_serviceName)
+ try { _controller.unRegister(_serviceCategory, _serviceName,_myEpr); }
+ catch (RegistryException e1){ _logger.warn("unable to unRegister service",e1); }
+
+ } // ________________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws Exception
+ {
+ // Third arg is null - Exception will be thrown if attribute is not found
+ _targetServiceCategory = _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ _targetEpr = _controller.getEprByName(_targetServiceName);
+ if (null==_targetEpr)
+ throw new ConfigurationException("EPR <"+_targetServiceName+"> not found in registry");
+ _courier = CourierFactory.getCourier(_targetEpr);
+
+ // Polling interval
+ String sAux = _config.getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ if (! Util.isNullString(sAux))
+ try { _sleepBetweenPolls = 1000 * Long.parseLong(sAux); }
+ catch (NumberFormatException e)
+ { _logger.warn("Invalid poll latency - keeping default of "+(_sleepBetweenPolls/1000)); }
+
+ resolveComposerClass();
+
+ // INPUT directory and suffix (used for FileFilter)
+ String sInpDir = _controller.obtainAtt(_config,ListenerTagNames.FILE_INPUT_DIR_TAG,null);
+ _inputDirectory = fileFromString(sInpDir);
+ seeIfOkToWorkOnDir(_inputDirectory);
+
+ _inputSuffix = _controller.obtainAtt(_config,ListenerTagNames.FILE_INPUT_SFX_TAG,null);
+ _inputSuffix = _inputSuffix.trim();
+ if (_inputSuffix.length()<1)
+ throw new Exception ("Invalid "+ListenerTagNames.FILE_INPUT_SFX_TAG+" attribute");
+ _fileFilter = new FileEndsWith(_inputSuffix);
+
+ // WORK suffix (will rename in input directory)
+ _workingSuffix = _controller.obtainAtt(_config,ListenerTagNames.FILE_WORK_SFX_TAG,".esbWork").trim();
+ if (_workingSuffix.length()<1)
+ throw new Exception ("Invalid "+ListenerTagNames.FILE_WORK_SFX_TAG+" attribute");
+ if (_inputSuffix.equals(_workingSuffix))
+ throw new Exception("Work suffix must differ from input suffix <"+_workingSuffix+">");
+
+ // ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
+ String sErrDir = _controller.obtainAtt(_config,ListenerTagNames.FILE_ERROR_DIR_TAG,sInpDir);
+ _errorDirectory = fileFromString(sErrDir);
+ seeIfOkToWorkOnDir(_errorDirectory);
+
+ _errorSuffix = _controller.obtainAtt(_config,ListenerTagNames.FILE_ERROR_SFX_TAG,".esbError").trim();
+ if (_errorSuffix.length()<1)
+ throw new Exception ("Invalid "+ListenerTagNames.FILE_ERROR_SFX_TAG+" attribute");
+ if (_errorDirectory.equals(_inputDirectory) && _inputSuffix.equals(_errorSuffix))
+ throw new Exception("Error suffix must differ from input suffix <"+_errorSuffix+">");
+
+
+ // Do users wish to delete files that were processed OK ?
+ String sPostDel = _controller.obtainAtt(_config,ListenerTagNames.FILE_POST_DEL_TAG,"false").trim();
+ _deleteAfterOK = Boolean.parseBoolean(sPostDel);
+ if (_deleteAfterOK)
+ return;
+
+ // POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
+ String sPostDir = _controller.obtainAtt(_config,ListenerTagNames.FILE_POST_DIR_TAG,sInpDir);
+ _postProcessDirectory = fileFromString(sPostDir);
+ seeIfOkToWorkOnDir(_postProcessDirectory);
+ _postProcessSuffix = _controller.obtainAtt(_config,ListenerTagNames.FILE_POST_SFX_TAG,".esbDone").trim();
+ if (_postProcessDirectory.equals(_inputDirectory))
+ { if (_postProcessSuffix.length()<1)
+ throw new Exception ("Invalid "+ListenerTagNames.FILE_POST_SFX_TAG+" attribute");
+ if (_postProcessSuffix.equals(_inputSuffix))
+ throw new Exception("Post process suffix must differ from input suffix <"+_postProcessSuffix+">");
+ }
+ } //________________________________
+
+ protected void resolveComposerClass() throws Exception
+ {
+ // Look for first "action" element - only first one will be used
+ String tagName = ListenerTagNames.ACTION_ELEMENT_TAG;
+ ConfigTree actionElement = _config.getFirstChild(tagName);
+ String sProcessMethod = null;
+ if (null!=actionElement)
+ { // class attribute
+ _composerName = _controller.obtainAtt(actionElement,ListenerTagNames.ACTION_CLASS_TAG,null);
+ _composerClass = Class.forName(_composerName);
+ Constructor oConst = _composerClass.getConstructor(new Class[] {ConfigTree.class});
+ _composer= oConst.newInstance(_config);
+ tagName = ListenerTagNames.PROCESS_METHOD_TAG;
+ sProcessMethod = _controller.obtainAtt(_config,tagName,tagName);
+ }
+ else
+ {
+ _composerName = PackageFileContents.class.getName();
+ _composerClass= PackageFileContents.class;
+ _composer = new PackageFileContents();
+ sProcessMethod = "process";
+ }
+
+ _processMethod = _composerClass.getMethod(sProcessMethod,new Class[] {Object.class});
+ } //________________________________
+
+ protected void seeIfOkToWorkOnDir (File p_oDir) throws Exception
+ {
+ if (! p_oDir.exists())
+ throw new Exception ("Directory "+p_oDir.toString()+" not found");
+ if (!p_oDir.isDirectory())
+ throw new Exception(p_oDir.toString()+" is not a directory");
+ if (!p_oDir.canRead())
+ throw new Exception("Can't read directory "+p_oDir.toString());
+ if (! p_oDir.canWrite())
+ throw new Exception ("Can't write/rename in directory "+p_oDir.toString());
+ } //________________________________
+
+ private File fileFromString(String file)
+ {
+ try { return new File(new URI(file)); }
+ catch(Exception e) { return new File(file); }
+ } //________________________________
+
+ private class FileEndsWith implements FileFilter
+ {
+ String m_sSuffix;
+ FileEndsWith(String p_sEnd) throws Exception
+ {
+ m_sSuffix = p_sEnd;
+ if (Util.isNullString(m_sSuffix))
+ throw new Exception("Must specify file extension");
+ } //______________________________
+
+ public boolean accept(File p_f)
+ { return (p_f.isFile())
+ ? p_f.toString().endsWith(m_sSuffix)
+ : false;
+ } //______________________________
+ } //____________________________________________________
+
+/**
+ * Default gateway action for files
+ * <p/>It will just drop the file contents into a Message
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+ private static class PackageFileContents
+ {
+ public Message process (Object obj) throws Exception
+ {
+ if (! (obj instanceof File))
+ throw new Exception ("Object must be instance of File");
+
+ Message message = MessageFactory.getInstance().getMessage();
+ message.getBody().setContents(getFileContent((File)obj));
+ return message;
+ }
+
+ private byte[] getFileContent(File file) throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] ba = new byte[1000];
+ int iQread;
+
+ FileInputStream inp = new FileInputStream(file);
+ while (-1!= (iQread=inp.read(ba)))
+ if (iQread > 0)
+ out.write(ba,0,iQread);
+ inp.close();
+
+ out.close();
+ return out.toByteArray();
+ }
+ } //____________________________________________________
+
+ protected final static Logger _logger = Logger.getLogger(FileGatewayListener.class);
+
+ protected ConfigTree _config;
+ protected GatewayListenerController _controller;
+ protected long _sleepBetweenPolls; // milliseconds
+
+ protected String _serviceCategory, _serviceName;
+ protected String _targetServiceCategory ,_targetServiceName;
+ protected EPR _myEpr ,_targetEpr;
+
+ protected String _composerName;
+ protected Class _composerClass;
+ protected Object _composer;
+ protected Method _processMethod;
+
+ protected Courier _courier;
+
+ protected boolean _deleteAfterOK;
+ protected File _inputDirectory ,_errorDirectory ,_postProcessDirectory;
+ protected String _inputSuffix ,_postProcessSuffix
+ ,_workingSuffix ,_errorSuffix
+ ;
+ protected FileFilter _fileFilter;
+} //____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java 2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/GatewayListenerController.java 2006-11-08 00:12:30 UTC (rev 7457)
@@ -33,14 +33,17 @@
import org.jboss.internal.soa.esb.command.CommandQueue;
import org.jboss.internal.soa.esb.command.CommandQueueException;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.util.EPRManager;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.parameters.ParamRepositoryException;
import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
import org.jboss.soa.esb.util.Util;
import org.xml.sax.SAXException;
@@ -104,11 +107,8 @@
return m_oException;
}
};
-
- private ActionDefinitionFactory actionDefinitionFactory;
-
/**
- * Package pivate default constructor.
+ * Package private default constructor.
*/
protected GatewayListenerController() { }
@@ -213,13 +213,6 @@
_endTime = (null == sEndT) ? Long.MAX_VALUE : _dateFormat.parse(
sEndT).getTime();
- // Read and initialise the action definitions...
- ConfigTree actionConfig = p_oP.getFirstChild("Actions");
- if(actionConfig == null) {
- throw new ConfigurationException("No 'Actions' configuration.");
- }
- actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
-
} // ________________________________
/**
@@ -265,12 +258,8 @@
while (endNotRequested())
{
_status = State.Running;
- for (ConfigTree oCurr : _config.getAllChildren()) {
- String sClass = oCurr.getAttribute(GATEWAY_CLASS_TAG);
- if (Util.isNullString(sClass))
- continue;
- tryToLaunchGateway(oCurr, sClass);
- }
+ for (ConfigTree oCurr : _config.getAllChildren())
+ tryToLaunchGateway(oCurr);
waitForCmdOrSleep();
@@ -301,24 +290,30 @@
.info("Finishing_____________________________________________________");
// Close the command queue...
- try {
- _commandQueue.close();
+ try
+ {
+ if (null!=_commandQueue)
+ _commandQueue.close();
} catch (CommandQueueException e) {
_logger.error("Error closing Command Queue.", e);
}
} // ________________________________
- private void tryToLaunchGateway(ConfigTree p_oP, String p_sClassName)
+ private void tryToLaunchGateway(ConfigTree p_oP)
{
+ String sClass = p_oP.getAttribute(GATEWAY_CLASS_TAG);
+ if (Util.isNullString(sClass))
+ return;
+
try {
- Class oListener = Class.forName(p_sClassName);
- Constructor oConst = oListener.getConstructor(new Class[] {
- this.getClass(), ConfigTree.class, ActionDefinitionFactory.class });
- Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
- p_oP, actionDefinitionFactory });
+ Class oListener = Class.forName(sClass);
+ Constructor oConst = oListener.getConstructor(new Class[]
+ {this.getClass(), ConfigTree.class});
+ Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this, p_oP});
new Thread(oRun).start();
} catch (Exception e) {
- _logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ _logger.error("Cannot launch <" + sClass + ">\n", e);
+ e.printStackTrace();
}
} // ________________________________
@@ -327,6 +322,7 @@
} // ________________________________
private void waitForCmdOrSleep() {
+
long lToGo = millisToWait();
if (null == _commandQueue) {
@@ -334,14 +330,13 @@
// No command queue nor topic - Just sleep until time
// exhausted, or thread interrupted
try {
- if (lToGo > 0)
- Thread.sleep(lToGo);
+ while ((lToGo=millisToWait()) > 0)
+ Thread.sleep(500);
} catch (InterruptedException e) {
_endTime = 0; // mark as end requested and return
}
return;
}
-
// Wait for commands until time exhausted or command received
// Note that received commands might change time variables (reload/end)
// that's why time to go is recalculated on each cycle
@@ -436,6 +431,10 @@
return _endRequested || System.currentTimeMillis() >= _endTime;
}
+ public void requestEnd() {
+ _endRequested=true;
+ _endTime = 0;
+ }
/**
* Accessor to determine if execution time is not expired, and no shutdown
* request received
@@ -507,23 +506,72 @@
return getEprManager().loadEPR(serviceName);
} // ________________________________
- public void register (String serviceName, EPR address)
+
+ /**
+ * @deprecated use register (ConfigTree config, EPR address) instead.
+ * @param name
+ * @param address
+ */
+ public void register (String name, EPR address)
{
- try { getEprManager().saveEPR(serviceName,address); }
+ try { getEprManager().saveEPR(name,address); }
catch (IOException e)
{
_logger.fatal("Cannot register service",e);
}
} // ________________________________
-
- public void unRegister (String serviceName, EPR address)
+ /**
+ * @deprecated use unRegister (String serviceCategoryName, String serviceName, EPR epr) instead.
+ * @param name
+ */
+ public void unRegister (String name)
{
- try { getEprManager().removeEPR(serviceName); }
+ try { getEprManager().removeEPR(name); }
catch (IOException e)
{
_logger.fatal("Cannot un-register service",e);
}
} // ________________________________
+ /**
+ * Register an EPR in the registry.
+ *
+ * @param config - a config tree containing the deployment-configuration of the service.
+ * @param epr - the epr (EndPoint Reference) of the service.
+ *
+ * @throws RegistryException
+ */
+ public void register(ConfigTree config , EPR epr) throws RegistryException
+ {
+ String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ String serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+ {
+ register(serviceName,epr);
+ return;
+ }
+ String serviceDescription = config.getAttribute(ListenerTagNames.SERVICE_DESCRIPTION_TAG);
+ String eprDescription = config.getAttribute(ListenerTagNames.EPR_DESCRIPTION_TAG);
+ Registry registry = RegistryFactory.getRegistry();
+ registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription);
+ }
+ /**
+ * Unregister the EPR from the registry.
+ *
+ * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
+ * @param serviceName - name of the service ("
+ * @param epr
+ * @throws RegistryException
+ */
+ public void unRegister(String serviceCategoryName, String serviceName , EPR epr) throws RegistryException
+ {
+ if ("eprManager".equalsIgnoreCase(serviceCategoryName))
+ {
+ unRegister(serviceName);
+ return;
+ }
+ Registry registry = RegistryFactory.getRegistry();
+ registry.unRegisterEPR(serviceCategoryName, serviceName, epr);
+ }
private CommandQueue _commandQueue;
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2006-11-08 00:12:30 UTC (rev 7457)
@@ -45,6 +45,7 @@
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
public class JmsGatewayListener implements Runnable
{
@@ -61,7 +62,9 @@
public void run()
{
if (null!=_serviceName)
- _controller.register(_serviceName,_myEpr);
+ try { _controller.register(_config,_myEpr); }
+ catch (RegistryException e1)
+ { _logger.warn("unable to register service",e1); }
while (_controller.continueLooping())
{
@@ -108,7 +111,9 @@
}
if (null!=_serviceName)
- _controller.unRegister(_serviceName,_myEpr);
+ try { _controller.unRegister(_serviceCategory, _serviceName,_myEpr); }
+ catch (RegistryException e1){ _logger.warn("unable to unRegister service",e1); }
+
if (null != _queueSession)
try { _queueSession.close(); }
@@ -128,6 +133,7 @@
protected void checkMyParms() throws Exception
{
// Third arg is null - Exception will be thrown if attribute is not found
+ _targetServiceCategory = _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
_targetServiceName = _controller.obtainAtt(_config, ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
_targetEpr = _controller.getEprByName(_targetServiceName);
if (null==_targetEpr)
@@ -229,7 +235,8 @@
protected GatewayListenerController _controller;
protected final long _sleepForRetries; // milliseconds
- protected String _serviceName,_targetServiceName;
+ protected String _serviceCategory ,_serviceName;
+ protected String _targetServiceCategory ,_targetServiceName;
protected EPR _myEpr ,_targetEpr;
protected String _composerName;
Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java 2006-11-07 23:42:58 UTC (rev 7456)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/FileGatewayListenerUnitTest.java 2006-11-08 00:12:30 UTC (rev 7457)
@@ -0,0 +1,149 @@
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.tests.BaseTest;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.EsbListenerController;
+import org.jboss.soa.esb.message.Message;
+
+public class FileGatewayListenerUnitTest extends BaseTest
+{
+
+ private static final String TMP_DIR = System.getProperty("java.io.tmpdir","/tmp");
+
+ static File _gatewayConfig = new File(TMP_DIR,"gatewayConfig.xml");
+ static File _esbListenerConfig = new File(TMP_DIR,"esbListenerConfig.xml");
+ static File _returnFile = new File(TMP_DIR,"messageBack.txt");
+ static File _droppedFile = new File(TMP_DIR,"legacy.dat");
+ static String POST_SUFFIX = ".sentToEsb";
+ static File _doneFile = new File(_droppedFile.toString()+POST_SUFFIX);
+
+
+ static String THE_TEXT = "This is the text that will travel around";
+
+ String _appServer = System.getProperty("unit.test.appserver","localhost");
+ EsbListenerController _esbListController;
+ GatewayListenerController _gatewayController;
+
+ public void setUp() throws Exception
+ {
+ Logger.getLogger(this.getClass()).info("Writing temp files to "+TMP_DIR);
+ writeGatewayConfig();
+ writeEsbListenerConfig();
+ bytesToFile(_droppedFile, THE_TEXT.getBytes());
+ _doneFile.delete();
+ _returnFile.delete();
+ }
+
+ public void tearDown() throws Exception
+ {
+ _gatewayConfig.delete();
+ _esbListenerConfig.delete();
+ _doneFile.delete();
+ _returnFile.delete();
+ }
+
+ public void test_FileGatewayListener() throws Exception
+ {
+ try
+ {
+ _esbListController = new EsbListenerController(_esbListenerConfig.toString());
+ new Thread(_esbListController).start();
+ Thread.sleep(2000);
+ _gatewayController = new GatewayListenerController(_gatewayConfig.toString());
+ new Thread(_gatewayController).start();
+ Thread.sleep(4000);
+
+ _esbListController.requestEnd();
+ _gatewayController.requestEnd();
+
+ assertEquals(THE_TEXT, stringFromFile(_returnFile));
+ assertEquals(true,_doneFile.exists());
+ }
+ catch (Exception e) { }
+ }
+
+ public static class MockAction
+ {
+ ConfigTree _config;
+ public MockAction(ConfigTree config) { _config = config; }
+ public Message writeToDisk (Message message) throws Exception
+ {
+ bytesToFile(_returnFile, message.getBody().getContents());
+ return message;
+ } // ________________________________
+
+ } // ___________________________________________________
+
+ void writeGatewayConfig() throws Exception
+ {
+ StringBuilder sb = new StringBuilder()
+ .append("<FileGateway parameterReloadSecs=\"180\">\n")
+ .append(" <GatewayConfig\n")
+ .append(" target-service-category=\"eprManager\"\n")
+ .append(" target-service-name=\"testService\"\n")
+ .append(" gatewayClass=\"org.jboss.soa.esb.listeners.gateway.FileGatewayListener\"\n")
+ .append(" pollLatencySeconds=\"5\"\n")
+ .append(" inputDir=\"").append(TMP_DIR).append("\"\n")
+ .append(" inputSuffix=\".dat\"\n")
+ .append(" postSuffix=\"").append(POST_SUFFIX).append("\"\n")
+ .append(" >\n")
+ .append(" </GatewayConfig>\n")
+ .append("</FileGateway>\n")
+ ;
+ bytesToFile(_gatewayConfig, sb.toString().getBytes());
+ }
+
+ void writeEsbListenerConfig() throws Exception
+ {
+ StringBuilder sb = new StringBuilder()
+ .append("<DummyTester parameterReloadSecs=\"180\">\n")
+ .append(" <DummyActionConfig\n")
+ .append(" service-category=\"eprManager\"\n")
+ .append(" service-name=\"testService\"\n")
+ .append(" service-description=\"My Dummy Service Name\"\n")
+ .append(" epr-description=\"EPR descriptionnnnnnnnnnnnnnn\"\n")
+ .append(" listenerClass=\"org.jboss.soa.esb.listeners.message.JmsQueueListener\"\n")
+ .append(" connection-factory=\"ConnectionFactory\"\n")
+ .append(" destination-type=\"queue\"\n")
+ .append(" destination-name=\"queue/A\"\n")
+ .append(" jndi-type=\"jboss\"\n")
+ .append(" jndi-URL=\"").append(_appServer).append("\"\n")
+ .append(" message-selector=\"service='test'\"\n")
+ .append(" >\n")
+ .append(" <action class=\"").append(MockAction.class.getName()).append("\" process=\"writeToDisk\" />\n")
+ .append(" </DummyActionConfig>\n")
+ .append("</DummyTester>\n")
+ ;
+ bytesToFile(_esbListenerConfig, sb.toString().getBytes());
+ }
+
+ static void bytesToFile(File file, byte[] text) throws Exception
+ {
+ OutputStream out = new FileOutputStream(file);
+ out.write(text);
+ out.close();
+ }
+
+ static String stringFromFile(File file) throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ FileInputStream inp = new FileInputStream(file);
+ byte[] ba = new byte[1000];
+ int iQ;
+ while (-1 != (iQ=inp.read(ba)))
+ if (iQ > 0)
+ out.write(ba,0,iQ);
+ inp.close();
+
+ out.close();
+ return out.toString();
+ }
+}
More information about the jboss-svn-commits
mailing list