[jboss-svn-commits] JBL Code SVN: r13221 - in labs/jbossesb/trunk/product: install/conf and 13 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Jul 7 16:46:12 EDT 2007
Author: mark.little at jboss.com
Date: 2007-07-07 16:46:11 -0400 (Sat, 07 Jul 2007)
New Revision: 13221
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/GatewayFilter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GatewayFilterUnitTest.java
Removed:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/metadata/
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/couriers/filter/
Modified:
labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
labs/jbossesb/trunk/product/install/conf/jbossesb-properties.xml
labs/jbossesb/trunk/product/install/tomcat/jbossesb-properties.xml
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/MetaDataFilter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/naming/FileNameGeneratorAction.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/InputOutputFilter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/FileCourierUnitTest.java
labs/jbossesb/trunk/product/services/jbossesb/src/test/resources/jbossesb-unittest-properties.xml
Log:
http://jira.jboss.com/jira/browse/JBESB-663
Modified: labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
===================================================================
(Binary files differ)
Modified: labs/jbossesb/trunk/product/install/conf/jbossesb-properties.xml
===================================================================
--- labs/jbossesb/trunk/product/install/conf/jbossesb-properties.xml 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/install/conf/jbossesb-properties.xml 2007-07-07 20:46:11 UTC (rev 13221)
@@ -86,6 +86,7 @@
</properties>
<properties name="filters">
- <property name="org.jboss.soa.esb.courier.filter.1" value="org.jboss.internal.soa.esb.message.metadata.MetaDataFilter"/>
+ <property name="org.jboss.soa.esb.filter.1" value="org.jboss.internal.soa.esb.message.filter.MetaDataFilter"/>
+ <property name="org.jboss.soa.esb.filter.2" value="org.jboss.internal.soa.esb.message.filter.GatewayFilter"/>
</properties>
</esb>
Modified: labs/jbossesb/trunk/product/install/tomcat/jbossesb-properties.xml
===================================================================
--- labs/jbossesb/trunk/product/install/tomcat/jbossesb-properties.xml 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/install/tomcat/jbossesb-properties.xml 2007-07-07 20:46:11 UTC (rev 13221)
@@ -87,6 +87,7 @@
<property name="org.jboss.soa.esb.routing.cbrClass" value="org.jboss.internal.soa.esb.services.routing.cbr.JBossRulesRouter"/>
</properties>
<properties name="filters">
- <property name="org.jboss.soa.esb.courier.filter.1" value="org.jboss.internal.soa.esb.message.metadata.MetaDataFilter"/>
+ <property name="org.jboss.soa.esb.filter.1" value="org.jboss.internal.soa.esb.message.filter.MetaDataFilter"/>
+ <property name="org.jboss.soa.esb.filter.2" value="org.jboss.internal.soa.esb.message.filter.GatewayFilter"/>
</properties>
</esb>
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -37,7 +37,7 @@
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.couriers.TwoWayCourier;
-import org.jboss.soa.esb.couriers.filter.FilterManager;
+import org.jboss.soa.esb.filter.FilterManager;
import org.jboss.soa.esb.message.Message;
/**
@@ -183,7 +183,7 @@
try
{
- message = FilterManager.getInstance().doOutputWork(message);
+ message = FilterManager.getInstance().doOutputWork(message, null);
return _deliverCourier.deliver(message);
}
@@ -222,7 +222,7 @@
throw new CourierException("No courier defined for pick ups");
final Message result = courier.pickup(waitTime);
- return (result == null ? null : FilterManager.getInstance().doInputWork(result)) ;
+ return (result == null ? null : FilterManager.getInstance().doInputWork(result, null)) ;
}
public void cleanup ()
Copied: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter (from rev 13189, labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/metadata)
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/GatewayFilter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/GatewayFilter.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/GatewayFilter.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.internal.soa.esb.message.filter;
+
+import java.io.File;
+import java.util.Map;
+
+import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.filter.FilterManager;
+import org.jboss.soa.esb.filter.InputOutputFilter;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerUtil;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * An inputoutput filter is called when the message is being received/sent and
+ * after it has been received from the transport/before it is passed to the
+ * transport. Implementations may augment or modify the message en route.
+ * Implementations that are pure output or pure input can just override the
+ * desired operation.
+ *
+ * This one adds some metadata to the message as it flows through the ESB.
+ *
+ * @author marklittle
+ */
+
+public class GatewayFilter extends InputOutputFilter
+{
+ public Message onOutput (Message msg, Map<String, Object> params)
+ throws CourierException
+ {
+ if (params != null)
+ {
+ /*
+ * We don't need to check to see if we're deployed on a gateway.
+ * If we aren't, then none of these objects will be present
+ * anyway!
+ */
+
+ File inFile = (File) params.get(Environment.ORIGINAL_FILE);
+
+ if (inFile != null)
+ msg.getProperties().setProperty(
+ Environment.ORIGINAL_FILE_NAME_MSG_PROP,
+ inFile.getName());
+
+ ConfigTree config = (ConfigTree) params
+ .get(Environment.GATEWAY_CONFIG);
+
+ if (config != null)
+ {
+ try
+ {
+ String url = ListenerUtil.obtainAtt(config, JDBCEpr.URL_TAG,
+ null);
+
+ if (url != null)
+ msg.getProperties().setProperty(
+ Environment.ORIGINAL_URL_PROP, url);
+ }
+ catch (Throwable ex)
+ {
+ FilterManager._logger.warn("GatewayFilter encountered problem during JDBC config check.", ex);
+ }
+
+ try
+ {
+ String queueName = ListenerUtil.obtainAtt(config,
+ JMSEpr.DESTINATION_NAME_TAG, null);
+
+ if (queueName != null)
+ msg.getProperties()
+ .setProperty(
+ Environment.ORIGINAL_QUEUE_NAME_MSG_PROP,
+ queueName);
+ }
+ catch (Throwable ex)
+ {
+ FilterManager._logger.warn("GatewayFilter encountered problem during JMS config check.", ex);
+ }
+ }
+ }
+
+ return msg;
+ }
+}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/MetaDataFilter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/metadata/MetaDataFilter.java 2007-07-06 20:07:13 UTC (rev 13189)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/MetaDataFilter.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -20,12 +20,13 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.internal.soa.esb.message.metadata;
+package org.jboss.internal.soa.esb.message.filter;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.Calendar;
+import java.util.Map;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.FTPEpr;
@@ -36,15 +37,16 @@
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.filter.InputOutputFilter;
+import org.jboss.soa.esb.filter.InputOutputFilter;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.Properties;
/**
- * An inputoutput filter is called when the message is being received/sent and after it
- * has been received from the transport/before it is passed to the transport.
- * Implementations may augment or modify the message en route. Implementations that
- * are pure output or pure input can just override the desired operation.
+ * An inputoutput filter is called when the message is being received/sent and
+ * after it has been received from the transport/before it is passed to the
+ * transport. Implementations may augment or modify the message en route.
+ * Implementations that are pure output or pure input can just override the
+ * desired operation.
*
* This one adds some metadata to the message as it flows through the ESB.
*
@@ -53,101 +55,111 @@
public class MetaDataFilter extends InputOutputFilter
{
- public Message onOutput (Message msg) throws CourierException
+ public Message onOutput (Message msg, Map<String, Object> params) throws CourierException
+ {
+ final Environment.Transports type;
+ final String name;
+ EPR destination = msg.getHeader().getCall().getTo();
+ if (destination != null)
{
- final Environment.Transports type ;
- final String name ;
- EPR destination = msg.getHeader().getCall().getTo();
- if (destination != null)
- {
- if (destination instanceof FTPEpr)
- {
- try
- {
- type = Environment.Transports.FTP;
-
- String dir = ModulePropertyManager.getPropertyManager(
- ModulePropertyManager.TRANSPORTS_MODULE).getProperty(
- Environment.FTP_LOCALDIR, System.getProperty("java.io.tmpdir"));
-
- name = dir + File.separator + msg.getHeader().getCall().getMessageID().toString() +
- ((FTPEpr) destination).getPostDirectory();
- }
- catch (URISyntaxException ex)
- {
- throw new CourierException(ex);
- }
- }
- else if (destination instanceof FileEpr)
- {
- try
- {
- type = Environment.Transports.File;
-
- name = ((FileEpr) destination).getURL() + File.separator +
- msg.getHeader().getCall().getMessageID().toString() + ((FileEpr) destination).getPostSuffix();
- }
- catch (MalformedURLException ex)
- {
- throw new CourierException(ex);
- }
- catch (URISyntaxException ex)
- {
- throw new CourierException(ex);
- }
- }
- else if (destination instanceof JMSEpr)
- {
- type = Environment.Transports.JMS;
-
- name = destination.getAddr().toString();
- }
- else if (destination instanceof JDBCEpr)
- {
- type = Environment.Transports.SQL;
-
- name = destination.getAddr().toString();
- }
- else if (destination instanceof HibernateEpr)
- {
- type = Environment.Transports.Hibernate;
- name = destination.getAddr().toString();
- }
- else
- {
- type = null ;
- name = null ;
- }
- }
- else
- {
- type = null ;
- name = null ;
- }
-
- final Properties props = msg.getProperties() ;
- setProperty(props, Environment.TRANSPORT_TYPE, type) ;
- setProperty(props, Environment.MESSAGE_SOURCE, name);
- props.setProperty(Environment.MESSAGE_ENTRY_TIME, Calendar.getInstance().getTime().toString());
- return msg;
- }
-
- public Message onInput (Message msg) throws CourierException
- {
- msg.getProperties().setProperty(Environment.MESSAGE_EXIT_TIME, Calendar.getInstance().getTime().toString());
-
- return msg;
- }
-
- private void setProperty(final Properties props, final String name, final Object value)
- {
- if (value == null)
+ if (destination instanceof FTPEpr)
+ {
+ try
{
- props.remove(name) ;
+ type = Environment.Transports.FTP;
+
+ String dir = ModulePropertyManager.getPropertyManager(
+ ModulePropertyManager.TRANSPORTS_MODULE)
+ .getProperty(Environment.FTP_LOCALDIR,
+ System.getProperty("java.io.tmpdir"));
+
+ name = dir
+ + File.separator
+ + msg.getHeader().getCall().getMessageID()
+ .toString()
+ + ((FTPEpr) destination).getPostDirectory();
}
- else
+ catch (URISyntaxException ex)
{
- props.setProperty(name, value) ;
+ throw new CourierException(ex);
}
+ }
+ else if (destination instanceof FileEpr)
+ {
+ try
+ {
+ type = Environment.Transports.File;
+
+ name = ((FileEpr) destination).getURL()
+ + File.separator
+ + msg.getHeader().getCall().getMessageID()
+ .toString()
+ + ((FileEpr) destination).getPostSuffix();
+ }
+ catch (MalformedURLException ex)
+ {
+ throw new CourierException(ex);
+ }
+ catch (URISyntaxException ex)
+ {
+ throw new CourierException(ex);
+ }
+ }
+ else if (destination instanceof JMSEpr)
+ {
+ type = Environment.Transports.JMS;
+
+ name = destination.getAddr().toString();
+ }
+ else if (destination instanceof JDBCEpr)
+ {
+ type = Environment.Transports.SQL;
+
+ name = destination.getAddr().toString();
+ }
+ else if (destination instanceof HibernateEpr)
+ {
+ type = Environment.Transports.Hibernate;
+ name = destination.getAddr().toString();
+ }
+ else
+ {
+ type = null;
+ name = null;
+ }
}
+ else
+ {
+ type = null;
+ name = null;
+ }
+
+ final Properties props = msg.getProperties();
+ setProperty(props, Environment.TRANSPORT_TYPE, type);
+ setProperty(props, Environment.MESSAGE_SOURCE, name);
+ props.setProperty(Environment.MESSAGE_ENTRY_TIME, Calendar
+ .getInstance().getTime().toString());
+ return msg;
+ }
+
+ public Message onInput (Message msg, Map<String, Object> params) throws CourierException
+ {
+ msg.getProperties().setProperty(Environment.MESSAGE_EXIT_TIME,
+ Calendar.getInstance().getTime().toString());
+
+ return msg;
+ }
+
+ private void setProperty (final Properties props, final String name,
+ final Object value)
+ {
+ if (value == null)
+ {
+ props.remove(name);
+ }
+ else
+ {
+ props.setProperty(name, value);
+ }
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/naming/FileNameGeneratorAction.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/naming/FileNameGeneratorAction.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/naming/FileNameGeneratorAction.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -32,6 +32,7 @@
import org.jboss.soa.esb.actions.ActionProcessingException;
import org.jboss.soa.esb.actions.BeanConfiguredAction;
import org.jboss.soa.esb.actions.naming.strategy.FileNamingStrategy;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.listeners.gateway.AbstractFileGateway;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.util.ClassUtil;
@@ -51,7 +52,7 @@
BeanConfiguredAction {
/** Property name for incoming file name */
- private String fileNameProperty = AbstractFileGateway.ORIGINAL_FILE_NAME_MSG_PROP;
+ private String fileNameProperty = Environment.ORIGINAL_FILE_NAME_MSG_PROP;
/** Property name for filename after processing */
private String resultProperty = DEFAULT_RESULT_PROPERTY;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -129,7 +129,7 @@
public static final String MSG_STORE_JCR_ROOT_NODE_PATH = "org.jboss.soa.esb.persistence.jcr.root.node.path";
/*
- * Some message properties.
+ * Some message metadata properties.
*/
public static final String TRANSPORT_TYPE = "org.jboss.soa.esb.message.transport.type";
@@ -137,7 +137,20 @@
public static final String MESSAGE_ENTRY_TIME = "org.jboss.soa.esb.message.time.dob"; // time born
public static final String MESSAGE_EXIT_TIME = "org.jboss.soa.esb.message.time.dod"; // time died
+ /** Message property name for original filename */
+
+ public static final String ORIGINAL_FILE_NAME_MSG_PROP = "org.jboss.soa.esb.gateway.original.file.name";
+ public static final String ORIGINAL_QUEUE_NAME_MSG_PROP = "org.jboss.soa.esb.gateway.original.queue.name";
+ public static final String ORIGINAL_URL_PROP = "org.jboss.soa.esb.gateway.original.url";
+
/*
+ * Some gateway attachment names.
+ */
+
+ public static final String ORIGINAL_FILE = "org.jboss.soa.esb.gateway.file";
+ public static final String GATEWAY_CONFIG = "org.jboss.soa.esb.gateway.config";
+
+ /*
* Some JMS specific message element names.
*/
@@ -146,11 +159,11 @@
/*
* Filter properties should be identified by:
*
- * org.jboss.soa.esb.courier.filter.<number>
+ * org.jboss.soa.esb.filter.<number>
*
* and will be called in increasing order of <number>. Same <number> may be
* called arbitrarily.
*/
- public static final String FILTER_NAME = "org.jboss.soa.esb.courier.filter";
+ public static final String FILTER_NAME = "org.jboss.soa.esb.filter";
}
Copied: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter (from rev 13189, labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/couriers/filter)
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/couriers/filter/FilterManager.java 2007-07-06 20:07:13 UTC (rev 13189)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -20,14 +20,16 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.soa.esb.couriers.filter;
+package org.jboss.soa.esb.filter;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
+import org.apache.log4j.Logger;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.couriers.CourierException;
@@ -37,145 +39,157 @@
import com.arjuna.common.util.propertyservice.PropertyManager;
/**
- * The FilterManager process messages through the various input and
- * output filters that have been registered via the configuration file.
- * Currently this is the same processing for all services/clients, i.e., there
- * is no per-process/per-client processing, it is all at the ESB instance
- * level.
+ * The FilterManager process messages through the various input and output
+ * filters that have been registered via the configuration file. Currently this
+ * is the same processing for all services/clients, i.e., there is no
+ * per-process/per-client processing, it is all at the ESB instance level.
*
* @author marklittle
*/
public class FilterManager
{
- public static final synchronized FilterManager getInstance ()
+ public static final synchronized FilterManager getInstance ()
+ {
+ if (_instance == null)
+ _instance = new FilterManager();
+
+ return _instance;
+ }
+
+ /**
+ * Iterate through the registered output filters, allowing them to
+ * augment/modify the message. The first error causes the iteration to
+ * stop and the send to fail.
+ *
+ * @param msg
+ * the original message.
+ * @return the resultant message.
+ * @throws CourierException
+ * thrown if there is any error during processing.
+ */
+
+ public Message doOutputWork (Message msg, Map<String, Object> params)
+ throws CourierException
+ {
+ if ((_filters == null) || (msg == null))
+ return msg;
+
+ for (int i = 0; i < _filters.length; i++)
{
- if (_instance == null)
- _instance = new FilterManager();
-
- return _instance;
+ if (_filters[i] != null)
+ msg = _filters[i].onOutput(msg, params);
}
-
- /**
- * Iterate through the registered output filters, allowing them to augment/modify
- * the message. The first error causes the iteration to stop and the send to fail.
- *
- * @param msg the original message.
- * @return the resultant message.
- * @throws CourierException thrown if there is any error during processing.
- */
-
- public Message doOutputWork (Message msg) throws CourierException
+
+ return msg;
+ }
+
+ /**
+ * Iterate through the registered input filters, allowing them to
+ * augment/modify the message. The first error causes the iteration to
+ * stop and the receive to fail.
+ *
+ * @param msg
+ * the original message.
+ * @return the resultant message.
+ * @throws CourierException
+ * thrown if there is any error during processing.
+ */
+
+ public Message doInputWork (Message msg, Map<String, Object> params)
+ throws CourierException
+ {
+ if ((_filters == null) || (msg == null))
+ return msg;
+
+ for (int i = 0; i < _filters.length; i++)
{
- if ((_filters == null) || (msg == null))
- return msg;
-
- for (int i = 0; i < _filters.length; i++)
- {
- if (_filters[i] != null)
- msg = _filters[i].onOutput(msg);
- }
-
- return msg;
+ if (_filters[i] != null)
+ msg = _filters[i].onInput(msg, params);
}
-
- /**
- * Iterate through the registered input filters, allowing them to augment/modify
- * the message. The first error causes the iteration to stop and the receive to fail.
- *
- * @param msg the original message.
- * @return the resultant message.
- * @throws CourierException thrown if there is any error during processing.
- */
-
- public Message doInputWork (Message msg) throws CourierException
+
+ return msg;
+ }
+
+ private FilterManager()
+ {
+ PropertyManager pm = ModulePropertyManager
+ .getPropertyManager(ModulePropertyManager.FILTER_MODULE);
+ Properties props = pm.getProperties();
+
+ if (props != null)
{
- if ((_filters == null) || (msg == null))
- return msg;
-
- for (int i = 0; i < _filters.length; i++)
+ Enumeration names = props.propertyNames();
+ TreeMap<Integer, String> map = new TreeMap<Integer, String>();
+
+ /*
+ * Go through the list of attributes and pull out those that
+ * refer to filters. Make sure we order them according to their
+ * names.
+ */
+
+ while (names.hasMoreElements())
+ {
+ String attrName = (String) names.nextElement();
+
+ if (attrName.startsWith(Environment.FILTER_NAME))
{
- if (_filters[i] != null)
- msg = _filters[i].onInput(msg);
+ String order = attrName.substring(Environment.FILTER_NAME
+ .length() + 1);
+ Integer value;
+
+ try
+ {
+ value = new Integer(order);
+ }
+ catch (NumberFormatException ex)
+ {
+ throw new RuntimeException("Filter name " + order
+ + " is invalid!");
+ }
+
+ map.put(value, props.getProperty(attrName));
}
-
- return msg;
- }
-
- private FilterManager ()
- {
- PropertyManager pm = ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE);
- Properties props = pm.getProperties();
+ }
- if (props != null)
+ if (map.size() > 0)
+ {
+ Collection<String> ordered = map.values();
+ Iterator<String> iter = ordered.iterator();
+ int index = 0;
+
+ _filters = new InputOutputFilter[map.size()];
+
+ while (iter.hasNext())
{
- Enumeration names = props.propertyNames();
- TreeMap<Integer, String> map = new TreeMap<Integer, String>();
-
- /*
- * Go through the list of attributes and pull out those that refer
- * to filters. Make sure we order them according to their names.
- */
-
- while (names.hasMoreElements())
- {
- String attrName = (String) names.nextElement();
+ String filterName = iter.next();
- if (attrName.startsWith(Environment.FILTER_NAME))
- {
- String order = attrName.substring(Environment.FILTER_NAME.length()+1);
- Integer value;
-
- try
- {
- value = new Integer(order);
- }
- catch (NumberFormatException ex)
- {
- throw new RuntimeException("Filter name "+order+" is invalid!");
- }
-
- map.put(value, props.getProperty(attrName));
- }
- }
-
- if (map.size() > 0)
- {
- Collection<String> ordered = map.values();
- Iterator<String> iter = ordered.iterator();
- int index = 0;
-
- _filters = new InputOutputFilter[map.size()];
-
- while (iter.hasNext())
- {
- String filterName = iter.next();
-
- try
- {
- Class c = ClassUtil.forName(filterName, getClass());
- InputOutputFilter theFilter = (InputOutputFilter) c.newInstance();
+ try
+ {
+ Class c = ClassUtil.forName(filterName, getClass());
+ InputOutputFilter theFilter = (InputOutputFilter) c
+ .newInstance();
- _filters[index++] = theFilter;
- }
- catch (ClassNotFoundException ex)
- {
- ex.printStackTrace();
- }
- catch (IllegalAccessException ex)
- {
- ex.printStackTrace();
- }
- catch (InstantiationException ex)
- {
- ex.printStackTrace();
- }
- }
- }
+ _filters[index++] = theFilter;
+ }
+ catch (ClassNotFoundException ex)
+ {
+ _logger.warn("FilterManager problem loading class.", ex);
+ }
+ catch (Throwable ex)
+ {
+ _logger.warn("FilterManager problem during load.", ex);
+ }
}
+ }
}
-
- private InputOutputFilter[] _filters;
-
- private static FilterManager _instance = null;
+ }
+
+ private InputOutputFilter[] _filters;
+
+ // It's public so all filters can use it if necessary.
+
+ public final static Logger _logger = Logger.getLogger(FilterManager.class);
+
+ private static FilterManager _instance = null;
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/InputOutputFilter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/couriers/filter/InputOutputFilter.java 2007-07-06 20:07:13 UTC (rev 13189)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/InputOutputFilter.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -20,8 +20,10 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.soa.esb.couriers.filter;
+package org.jboss.soa.esb.filter;
+import java.util.Map;
+
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.message.Message;
@@ -35,16 +37,17 @@
*/
public class InputOutputFilter
-{
+{
/**
* Called as the message flows towards the transport.
*
* @param msg the message
+ * @param params additional information
* @return an augmented message (or the original message)
* @throws CourierException thrown if an error occurs.
*/
- public Message onOutput (Message msg) throws CourierException
+ public Message onOutput (Message msg, Map<String, Object> params) throws CourierException
{
return msg;
}
@@ -53,11 +56,12 @@
* Called immediately after the message is received from the transport.
*
* @param msg the message
+ * @param params additional information
* @return an augmented message (or the original message)
* @throws CourierException thrown if an error occurs.
*/
- public Message onInput (Message msg) throws CourierException
+ public Message onInput (Message msg, Map<String, Object> params) throws CourierException
{
return msg;
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -31,18 +31,22 @@
import java.net.URI;
import java.net.URL;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.FileEpr;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.filter.FilterManager;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
@@ -66,547 +70,590 @@
* @since Version 4.0
*
*/
-public abstract class AbstractFileGateway extends AbstractThreadedManagedLifecycle
+public abstract class AbstractFileGateway extends
+ AbstractThreadedManagedLifecycle
{
- abstract File[] getFileList() throws GatewayException;
+ abstract File[] getFileList () throws GatewayException;
- abstract byte[] getFileContents(File file) throws GatewayException;
+ abstract byte[] getFileContents (File file) throws GatewayException;
- abstract boolean renameFile(File from, File to) throws GatewayException;
+ abstract boolean renameFile (File from, File to) throws GatewayException;
- abstract boolean deleteFile(File file) throws GatewayException;
+ abstract boolean deleteFile (File file) throws GatewayException;
- abstract void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException;
+ abstract void seeIfOkToWorkOnDir (File p_oDir) throws GatewayException;
- abstract void getDefaultComposer() throws GatewayException;
+ abstract void getDefaultComposer () throws GatewayException;
- abstract void bytesToFile(byte[] bytes, File file) throws GatewayException;
+ abstract void bytesToFile (byte[] bytes, File file) throws GatewayException;
- protected AbstractFileGateway(ConfigTree config) throws ConfigurationException, RegistryException, GatewayException
- {
- super(config) ;
- _config = config;
- _sleepBetweenPolls = 10000; // milliseconds
- checkMyParms();
- } // __________________________________
-
- /**
+ protected AbstractFileGateway(ConfigTree config)
+ throws ConfigurationException, RegistryException, GatewayException
+ {
+ super(config);
+ _config = config;
+ _sleepBetweenPolls = 10000; // milliseconds
+ checkMyParms();
+ } // __________________________________
+
+ /**
* Handle the initialisation of the managed instance.
*
- * @throws ManagedLifecycleException for errors while initialisation.
+ * @throws ManagedLifecycleException
+ * for errors while initialisation.
*/
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- try
- {
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected registry exception", re) ;
- }
- }
+ protected void doInitialise () throws ManagedLifecycleException
+ {
+ try
+ {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+ _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <"
+ + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected registry exception", re);
+ }
+ }
- /**
+ /**
* Execute on the thread.
*/
- protected void doRun()
- {
+ protected void doRun ()
+ {
- EPR replyEpr = null;
- Message replyMsg = null;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " started on thread " + Thread.currentThread().getName());
- }
+ EPR replyEpr = null;
+ Message replyMsg = null;
- do {
- File[] fileList;
- try {
- fileList = getFileList();
- } catch (GatewayException e) {
- _logger.error("Can't retrieve file list", e);
- continue;
- }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
- for (File fileIn : fileList)
- {
- // Try to rename - if unsuccessful, somebody else got it first
- File fileWork = getWorkFileName( fileIn, _workingSuffix);
- try
- {
- if (!renameFile(fileIn, fileWork))
- continue;
- }
- catch (GatewayException e)
- {
- _logger.error("Problems renaming file " + fileIn + " to "
- + fileWork);
- continue;
- }
+ do
+ {
+ File[] fileList;
+ try
+ {
+ fileList = getFileList();
+ }
+ catch (GatewayException e)
+ {
+ _logger.error("Can't retrieve file list", e);
+ continue;
+ }
- Throwable thrown = null;
- String text = null;
- try
- {
- Object obj = _processMethod.invoke(_composer, new Object[]
- { fileWork });
- if (null == obj)
- {
- _logger.warn("Action class method <"
- + _processMethod.getName()
- + "> returned a null object");
- continue;
- }
- boolean bSent = false;
-
- Message outMessage = (Message) obj;
- outMessage.getProperties().setProperty(ORIGINAL_FILE_NAME_MSG_PROP, fileIn.getName());
- for (EPR current : _targetEprs)
- {
- if (current instanceof FileEpr)
- {
- try
- {
- FileEpr fpr = (FileEpr) current;
- FileEpr newEpr = new FileEpr(fpr.getURL());
- newEpr.setPostDelete(false);
- newEpr.setPostDirectory(fpr.getURL().getFile());
- newEpr.setPostSuffix(fpr.getInputSuffix());
- current = newEpr;
- }
- catch (Exception e)
- {
- _logger.error("Problems with file EPR", e); }
- }
- _courier = getCourier(current);
- try
- {
- replyEpr = null;
- outMessage.getHeader().getCall().setTo(current);
- if (_maxMillisForResponse>0)
- {
- replyEpr = CourierUtil.getDefaultReplyToEpr(current);
- outMessage.getHeader().getCall().setReplyTo(replyEpr);
- }
- if (_courier.deliver(outMessage))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier) ;
- }
-
- }
- if (!bSent)
- {
- text = "Target service <" + _targetServiceCategory
- + "," + _targetServiceName
- + "> is not registered";
- thrown = new Exception(text);
- }
- else if (null!=replyEpr)
- {
- TwoWayCourier replyCourier = CourierFactory.getPickupCourier(replyEpr);
- try
- {
- replyMsg = replyCourier.pickup(_maxMillisForResponse);
- _responderMethod.invoke(_composer, new Object[] {replyMsg,fileIn});
- }
- catch (CourierTimeoutException e)
- {
- thrown = e;
- text = "Expected response was not received from invoked service";
- replyMsg = MessageFactory.getInstance().getMessage();
- String timedOut = "Service <"
- +_targetServiceCategory+","+"_targetServiceName"
- +"> timed out without sending response";
- replyMsg.getBody().setByteArray(timedOut.getBytes());
- _responderMethod.invoke(_composer, new Object[] {replyMsg,fileIn});
- }
- finally
- {
- if (null!=replyCourier)
- CourierUtil.cleanCourier(replyCourier);
- }
- }
- }
- catch (InvocationTargetException e)
- {
- thrown = e;
- text = "Problems invoking method <"
- + _processMethod.getName() + ">";
-
- }
- catch (IllegalAccessException e)
- {
- thrown = e;
- text = "Problems invoking method <"
- + _processMethod.getName() + ">";
- }
- catch (ClassCastException e)
- {
- thrown = e;
- text = "Action class method <" + _processMethod.getName()
- + "> returned a non Message object";
- }
- catch (CourierException e)
- {
- thrown = e;
- if (null != _courier)
- text = "Courier <" + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED";
- else
- text = "NULL courier can't deliverAsync Message";
- }
- catch (MalformedEPRException e)
- {
- thrown = e;
- if (null != _courier)
- text = "Courier <" + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED with malformed EPR.";
- else
- text = "NULL courier can't deliverAsync Message";
- }
-
- if (null == thrown)
- {
- File fileOK = new File(_postProcessDirectory, fileIn
- .getName()
- + _postProcessSuffix);
- if (_deleteAfterOK)
- {
- try
- {
- deleteFile(fileWork);
- }
- catch (GatewayException e)
- {
- _logger
- .error(
- "File "
- + fileIn
- + " has been processed and renamed to "
- + fileWork
- + ", but there were problems deleting it from the input directory ",
- e);
- }
- }
- else
- {
- try
- {
- renameFile(fileWork, fileOK);
- }
- catch (GatewayException e)
- {
- _logger
- .error(
- "File "
- + fileIn
- + " has been processed and renamed to "
- + fileWork
- + ", but there were problems renaming it to "
- + fileOK, e);
- }
- }
- }
- else
- {
- thrown.printStackTrace();
- _logger.error(text, thrown);
- File fileError = new File(_errorDirectory, fileIn.getName() + _errorSuffix);
- try
- {
- deleteFile(fileError);
- }
- catch (GatewayException e)
- {
- _logger.warn( "File : " + fileError + " did not exist.");
- }
- try
- {
- renameFile(fileWork, fileError);
- }
- catch (GatewayException e)
- {
- _logger.error("Problems renaming file " + fileWork + " to " + fileError, e);
- }
- }
- }
+ for (File fileIn : fileList)
+ {
+ // Try to rename - if unsuccessful, somebody else got it first
+ File fileWork = getWorkFileName(fileIn, _workingSuffix);
+ try
+ {
+ if (!renameFile(fileIn, fileWork))
+ continue;
}
- while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepBetweenPolls)) ;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread " + Thread.currentThread().getName());
- }
- } // ________________________________
-
- protected File getWorkFileName( File fileIn, String suffix )
+ catch (GatewayException e)
{
- return new File(fileIn.toString() + _workingSuffix);
+ _logger.error("Problems renaming file " + fileIn + " to "
+ + fileWork);
+ continue;
}
- /*
- * Extracted to simplify testing
- */
- protected Courier getCourier( EPR current ) throws CourierException, MalformedEPRException
+ Throwable thrown = null;
+ String text = null;
+ try
{
- return CourierFactory.getCourier(current);
- }
+ Object obj = _processMethod.invoke(_composer, new Object[]
+ { fileWork });
+ if (null == obj)
+ {
+ _logger.warn("Action class method <"
+ + _processMethod.getName()
+ + "> returned a null object");
+ continue;
+ }
+ boolean bSent = false;
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- }
+ Message outMessage = (Message) obj;
+ Map<String, Object> params = new HashMap<String, Object>();
- /*
- * Is the input suffix valid for this type of gateway?
- */
-
- protected void checkInputSuffix () throws ConfigurationException
- {
- if (_inputSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_INPUT_SFX_TAG + " attribute");
- }
-
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- private void checkMyParms() throws ConfigurationException, RegistryException, GatewayException
- {
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
-
- // Polling interval
- String sAux = _config
- .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
-
- if (!Util.isNullString(sAux))
- {
+ params.put(Environment.ORIGINAL_FILE, fileIn);
+ params.put(Environment.GATEWAY_CONFIG, _config);
+
+ outMessage = FilterManager.getInstance().doOutputWork(outMessage, params);
+
+ for (EPR current : _targetEprs)
+ {
+ if (current instanceof FileEpr)
+ {
+ try
+ {
+ FileEpr fpr = (FileEpr) current;
+ FileEpr newEpr = new FileEpr(fpr.getURL());
+ newEpr.setPostDelete(false);
+ newEpr.setPostDirectory(fpr.getURL().getFile());
+ newEpr.setPostSuffix(fpr.getInputSuffix());
+ current = newEpr;
+ }
+ catch (Exception e)
+ {
+ _logger.error("Problems with file EPR", e);
+ }
+ }
+ _courier = getCourier(current);
try
{
- _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+ replyEpr = null;
+ outMessage.getHeader().getCall().setTo(current);
+ if (_maxMillisForResponse > 0)
+ {
+ replyEpr = CourierUtil
+ .getDefaultReplyToEpr(current);
+ outMessage.getHeader().getCall().setReplyTo(
+ replyEpr);
+ }
+ if (_courier.deliver(outMessage))
+ {
+ bSent = true;
+ break;
+ }
}
- catch (NumberFormatException e)
+ finally
{
- _logger.warn("Invalid poll latency - keeping default of "
- + (_sleepBetweenPolls / 1000));
+ CourierUtil.cleanCourier(_courier);
}
- }
- else
- {
- _logger.warn("No value specified for: "
- + ListenerTagNames.POLL_LATENCY_SECS_TAG
- + " - Using default of " + (_sleepBetweenPolls / 1000));
- }
- resolveComposerClass();
-
- boolean hasResponder = _responderMethod!=null;
- _maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait
- (_config, _logger, hasResponder);
- try
- {
- // INPUT directory and suffix (used for FileFilter)
- String url = _config.getAttribute(ListenerTagNames.URL_TAG);
- String sInpDir = (null != url) ? new URL(url).getFile() :
- ListenerUtil.obtainAtt(_config, ListenerTagNames.FILE_INPUT_DIR_TAG, null);
- _inputDirectory = fileFromString(sInpDir);
- seeIfOkToWorkOnDir(_inputDirectory);
-
- _inputSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_INPUT_SFX_TAG, null);
- _inputSuffix = _inputSuffix.trim();
-
- checkInputSuffix();
-
- // WORK suffix (will rename in input directory)
- _workingSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_WORK_SFX_TAG, ".esbWork").trim();
- if (_workingSuffix.length() < 1)
- throw new ConfigurationException("Invalid " + ListenerTagNames.FILE_WORK_SFX_TAG
- + " attribute");
-
- if (_inputSuffix.equals(_workingSuffix))
- throw new ConfigurationException("Work suffix must differ from input suffix <"
- + _workingSuffix + ">");
-
- // ERROR directory and suffix (defaults to input dir and ".esbError"
- // suffix)
- String sErrDir = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_ERROR_DIR_TAG, sInpDir);
- _errorDirectory = fileFromString(sErrDir);
- seeIfOkToWorkOnDir(_errorDirectory);
-
- _errorSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_ERROR_SFX_TAG, ".esbError").trim();
- if (_errorSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_ERROR_SFX_TAG + " attribute");
- if (_errorDirectory.equals(_inputDirectory)
- && _inputSuffix.equals(_errorSuffix))
- throw new ConfigurationException("Error suffix must differ from input suffix <"
- + _errorSuffix + ">");
-
- // Do users wish to delete files that were processed OK ?
- String sPostDel = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_POST_DEL_TAG, "false").trim();
- _deleteAfterOK = Boolean.parseBoolean(sPostDel);
- if (_deleteAfterOK)
- return;
-
- // POST (done) directory and suffix (defaults to input dir and
- // ".esbDone" suffix)
- String sPostDir = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_POST_DIR_TAG, sInpDir);
- _postProcessDirectory = fileFromString(sPostDir);
- seeIfOkToWorkOnDir(_postProcessDirectory);
- _postProcessSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_POST_SFX_TAG, ".esbDone").trim();
-
- if (_postProcessDirectory.equals(_inputDirectory))
- {
- if (_postProcessSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_POST_SFX_TAG + " attribute");
- if (_postProcessSuffix.equals(_inputSuffix))
- throw new ConfigurationException(
- "Post process suffix must differ from input suffix <"
- + _postProcessSuffix + ">");
- }
- }
- catch (GatewayException ex)
- {
- throw ex;
- }
- catch (MalformedURLException ex)
- {
- throw new ConfigurationException(ex);
- }
- } // ________________________________
-
- protected void resolveComposerClass() throws ConfigurationException, GatewayException
- {
- String sProcessMethod = null;
- String sResponderMethod = null;
- try
- {
- _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
- sResponderMethod = _config.getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
+ }
+ if (!bSent)
+ {
+ text = "Target service <" + _targetServiceCategory
+ + "," + _targetServiceName
+ + "> is not registered";
+ thrown = new Exception(text);
+ }
+ else if (null != replyEpr)
+ {
+ TwoWayCourier replyCourier = CourierFactory
+ .getPickupCourier(replyEpr);
+ try
+ {
+ replyMsg = replyCourier
+ .pickup(_maxMillisForResponse);
+ _responderMethod.invoke(_composer, new Object[]
+ { replyMsg, fileIn });
}
- else
+ catch (CourierTimeoutException e)
{
- getDefaultComposer();
- sProcessMethod = "process";
- sResponderMethod = "respond";
+ thrown = e;
+ text = "Expected response was not received from invoked service";
+ replyMsg = MessageFactory.getInstance()
+ .getMessage();
+ String timedOut = "Service <"
+ + _targetServiceCategory + ","
+ + "_targetServiceName"
+ + "> timed out without sending response";
+ replyMsg.getBody()
+ .setByteArray(timedOut.getBytes());
+ _responderMethod.invoke(_composer, new Object[]
+ { replyMsg, fileIn });
}
-
- _processMethod = _composerClass.getMethod
- (sProcessMethod, new Class[]{ Object.class });
-
- _responderMethod = (null==sResponderMethod) ? null
- : _composerClass.getMethod(sResponderMethod
- , new Class[] {Message.class, File.class});
+ finally
+ {
+ if (null != replyCourier)
+ CourierUtil.cleanCourier(replyCourier);
+ }
+ }
}
- catch (InvocationTargetException ex)
+ catch (InvocationTargetException e)
{
- throw new ConfigurationException(ex);
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
+
}
- catch (IllegalAccessException ex)
+ catch (IllegalAccessException e)
{
- throw new ConfigurationException(ex);
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
}
- catch (InstantiationException ex)
+ catch (ClassCastException e)
{
- throw new ConfigurationException(ex);
+ thrown = e;
+ text = "Action class method <" + _processMethod.getName()
+ + "> returned a non Message object";
}
- catch (NoSuchMethodException ex)
+ catch (CourierException e)
{
- throw new ConfigurationException(ex);
+ thrown = e;
+ if (null != _courier)
+ text = "Courier <" + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED";
+ else
+ text = "NULL courier can't deliverAsync Message";
}
- catch (ClassNotFoundException ex)
+ catch (MalformedEPRException e)
{
- throw new ConfigurationException(ex);
+ thrown = e;
+ if (null != _courier)
+ text = "Courier <"
+ + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED with malformed EPR.";
+ else
+ text = "NULL courier can't deliverAsync Message";
}
- } // ________________________________
- private File fileFromString(String file)
- {
- try
+ if (null == thrown)
{
- return new File(new URI(file));
+ File fileOK = new File(_postProcessDirectory, fileIn
+ .getName()
+ + _postProcessSuffix);
+ if (_deleteAfterOK)
+ {
+ try
+ {
+ deleteFile(fileWork);
+ }
+ catch (GatewayException e)
+ {
+ _logger
+ .error(
+ "File "
+ + fileIn
+ + " has been processed and renamed to "
+ + fileWork
+ + ", but there were problems deleting it from the input directory ",
+ e);
+ }
+ }
+ else
+ {
+ try
+ {
+ renameFile(fileWork, fileOK);
+ }
+ catch (GatewayException e)
+ {
+ _logger
+ .error(
+ "File "
+ + fileIn
+ + " has been processed and renamed to "
+ + fileWork
+ + ", but there were problems renaming it to "
+ + fileOK, e);
+ }
+ }
}
- catch (Exception e)
+ else
{
- return new File(file);
+ thrown.printStackTrace();
+ _logger.error(text, thrown);
+ File fileError = new File(_errorDirectory, fileIn.getName()
+ + _errorSuffix);
+ try
+ {
+ deleteFile(fileError);
+ }
+ catch (GatewayException e)
+ {
+ _logger.warn("File : " + fileError + " did not exist.");
+ }
+ try
+ {
+ renameFile(fileWork, fileError);
+ }
+ catch (GatewayException e)
+ {
+ _logger.error("Problems renaming file " + fileWork
+ + " to " + fileError, e);
+ }
}
- } // ________________________________
+ }
+ }
+ while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
+ _sleepBetweenPolls));
- protected final static Logger _logger = Logger
- .getLogger(AbstractFileGateway.class);
+ if (_logger.isDebugEnabled())
+ {
+ _logger
+ .debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread "
+ + Thread.currentThread().getName());
+ }
+ } // ________________________________
- protected ConfigTree _config;
+ protected File getWorkFileName (File fileIn, String suffix)
+ {
+ return new File(fileIn.toString() + _workingSuffix);
+ }
- protected long _sleepBetweenPolls; // milliseconds
- protected long _maxMillisForResponse;
+ /*
+ * Extracted to simplify testing
+ */
+ protected Courier getCourier (EPR current) throws CourierException,
+ MalformedEPRException
+ {
+ return CourierFactory.getCourier(current);
+ }
- protected String _targetServiceCategory, _targetServiceName;
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException
+ * for errors while destroying.
+ */
+ protected void doDestroy () throws ManagedLifecycleException
+ {
+ }
- protected Collection<EPR> _targetEprs;
+ /*
+ * Is the input suffix valid for this type of gateway?
+ */
- protected String _composerName;
+ protected void checkInputSuffix () throws ConfigurationException
+ {
+ if (_inputSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_INPUT_SFX_TAG + " attribute");
+ }
- protected Class _composerClass;
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ private void checkMyParms () throws ConfigurationException,
+ RegistryException, GatewayException
+ {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- protected Object _composer;
+ // Polling interval
+ String sAux = _config
+ .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- protected Method _processMethod;
- protected Method _responderMethod;
+ if (!Util.isNullString(sAux))
+ {
+ try
+ {
+ _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Invalid poll latency - keeping default of "
+ + (_sleepBetweenPolls / 1000));
+ }
+ }
+ else
+ {
+ _logger.warn("No value specified for: "
+ + ListenerTagNames.POLL_LATENCY_SECS_TAG
+ + " - Using default of " + (_sleepBetweenPolls / 1000));
+ }
- protected Courier _courier;
+ resolveComposerClass();
- protected boolean _deleteAfterOK;
+ boolean hasResponder = _responderMethod != null;
+ _maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait(_config,
+ _logger, hasResponder);
+ try
+ {
+ // INPUT directory and suffix (used for FileFilter)
+ String url = _config.getAttribute(ListenerTagNames.URL_TAG);
+ String sInpDir = (null != url) ? new URL(url).getFile()
+ : ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_INPUT_DIR_TAG, null);
+ _inputDirectory = fileFromString(sInpDir);
+ seeIfOkToWorkOnDir(_inputDirectory);
- protected File _inputDirectory, _errorDirectory, _postProcessDirectory;
+ _inputSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_INPUT_SFX_TAG, null);
+ _inputSuffix = _inputSuffix.trim();
- protected String _inputSuffix, _postProcessSuffix, _workingSuffix,
- _errorSuffix;
+ checkInputSuffix();
- protected FileFilter _fileFilter;
-
- /** Message property name for original filename */
- public static final String ORIGINAL_FILE_NAME_MSG_PROP = "org.jboss.soa.esb.gateway.original.file.name";
-
+ // WORK suffix (will rename in input directory)
+ _workingSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_WORK_SFX_TAG, ".esbWork").trim();
+ if (_workingSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_WORK_SFX_TAG + " attribute");
+
+ if (_inputSuffix.equals(_workingSuffix))
+ throw new ConfigurationException(
+ "Work suffix must differ from input suffix <"
+ + _workingSuffix + ">");
+
+ // ERROR directory and suffix (defaults to input dir and
+ // ".esbError"
+ // suffix)
+ String sErrDir = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_ERROR_DIR_TAG, sInpDir);
+ _errorDirectory = fileFromString(sErrDir);
+ seeIfOkToWorkOnDir(_errorDirectory);
+
+ _errorSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_ERROR_SFX_TAG, ".esbError").trim();
+ if (_errorSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_ERROR_SFX_TAG + " attribute");
+ if (_errorDirectory.equals(_inputDirectory)
+ && _inputSuffix.equals(_errorSuffix))
+ throw new ConfigurationException(
+ "Error suffix must differ from input suffix <"
+ + _errorSuffix + ">");
+
+ // Do users wish to delete files that were processed OK ?
+ String sPostDel = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_POST_DEL_TAG, "false").trim();
+ _deleteAfterOK = Boolean.parseBoolean(sPostDel);
+ if (_deleteAfterOK)
+ return;
+
+ // POST (done) directory and suffix (defaults to input dir and
+ // ".esbDone" suffix)
+ String sPostDir = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_POST_DIR_TAG, sInpDir);
+ _postProcessDirectory = fileFromString(sPostDir);
+ seeIfOkToWorkOnDir(_postProcessDirectory);
+ _postProcessSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_POST_SFX_TAG, ".esbDone").trim();
+
+ if (_postProcessDirectory.equals(_inputDirectory))
+ {
+ if (_postProcessSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_POST_SFX_TAG + " attribute");
+ if (_postProcessSuffix.equals(_inputSuffix))
+ throw new ConfigurationException(
+ "Post process suffix must differ from input suffix <"
+ + _postProcessSuffix + ">");
+ }
+ }
+ catch (GatewayException ex)
+ {
+ throw ex;
+ }
+ catch (MalformedURLException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
+
+ protected void resolveComposerClass () throws ConfigurationException,
+ GatewayException
+ {
+ String sProcessMethod = null;
+ String sResponderMethod = null;
+ try
+ {
+ _composerName = _config
+ .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName)
+ { // class attribute
+ _composerClass = ClassUtil.forName(_composerName, getClass());
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ { ConfigTree.class });
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config
+ .getAttribute(
+ ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+ "process");
+ sResponderMethod = _config
+ .getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
+ }
+ else
+ {
+ getDefaultComposer();
+ sProcessMethod = "process";
+ sResponderMethod = "respond";
+ }
+
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]
+ { Object.class });
+
+ _responderMethod = (null == sResponderMethod) ? null
+ : _composerClass.getMethod(sResponderMethod, new Class[]
+ { Message.class, File.class });
+ }
+ catch (InvocationTargetException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ catch (IllegalAccessException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ catch (InstantiationException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ catch (NoSuchMethodException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ catch (ClassNotFoundException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
+
+ private File fileFromString (String file)
+ {
+ try
+ {
+ return new File(new URI(file));
+ }
+ catch (Exception e)
+ {
+ return new File(file);
+ }
+ } // ________________________________
+
+ protected final static Logger _logger = Logger
+ .getLogger(AbstractFileGateway.class);
+
+ protected ConfigTree _config;
+
+ protected long _sleepBetweenPolls; // milliseconds
+
+ protected long _maxMillisForResponse;
+
+ protected String _targetServiceCategory, _targetServiceName;
+
+ protected Collection<EPR> _targetEprs;
+
+ protected String _composerName;
+
+ protected Class _composerClass;
+
+ protected Object _composer;
+
+ protected Method _processMethod;
+
+ protected Method _responderMethod;
+
+ protected Courier _courier;
+
+ protected boolean _deleteAfterOK;
+
+ protected File _inputDirectory, _errorDirectory, _postProcessDirectory;
+
+ protected String _inputSuffix, _postProcessSuffix, _workingSuffix,
+ _errorSuffix;
+
+ protected FileFilter _fileFilter;
+
} // ____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -26,6 +26,8 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -43,10 +45,12 @@
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.filter.FilterManager;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.NamingContext;
import org.jboss.soa.esb.listeners.ListenerTagNames;
@@ -61,386 +65,462 @@
public class JmsGatewayListener extends AbstractThreadedManagedLifecycle
{
- /**
+ /**
* serial version uid for this class
*/
- private static final long serialVersionUID = 5070422864110923930L;
+ private static final long serialVersionUID = 5070422864110923930L;
- public JmsGatewayListener (ConfigTree listenerConfig) throws ConfigurationException
- {
- super(listenerConfig) ;
- _config = listenerConfig;
- checkMyParms();
- } // __________________________________
+ public JmsGatewayListener(ConfigTree listenerConfig)
+ throws ConfigurationException
+ {
+ super(listenerConfig);
+ _config = listenerConfig;
+ checkMyParms();
+ } // __________________________________
-
- /**
+ /**
* Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while initialisation.
+ *
+ * @throws ManagedLifecycleException
+ * for errors while initialisation.
*/
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- try
- {
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected registry exception", re) ;
- }
+ protected void doInitialise () throws ManagedLifecycleException
+ {
+ try
+ {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+ _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <"
+ + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected registry exception", re);
+ }
- try
- {
- prepareMessageReceiver();
- }
- catch (final ConnectionException ce)
- {
- throw new ManagedLifecycleException("Unexpected connection exception from prepareMessageReceiver", ce);
- }
- catch (final JMSException jmse)
- {
- throw new ManagedLifecycleException("Unexpected JMS error from prepareMessageReceiver", jmse);
- }
- catch (final ConfigurationException ce)
- {
- throw new ManagedLifecycleException("Unexpected configuration exception from prepareMessageReceiver", ce);
- }
+ try
+ {
+ prepareMessageReceiver();
+ }
+ catch (final ConnectionException ce)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected connection exception from prepareMessageReceiver",
+ ce);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected JMS error from prepareMessageReceiver", jmse);
+ }
+ catch (final ConfigurationException ce)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected configuration exception from prepareMessageReceiver",
+ ce);
+ }
- if (_serviceName != null)
- {
- try
- {
- RegistryUtil.register(_config, _myEpr);
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected error during registration for epr " + _myEpr, re);
- }
- }
- }
+ if (_serviceName != null)
+ {
+ try
+ {
+ RegistryUtil.register(_config, _myEpr);
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected error during registration for epr "
+ + _myEpr, re);
+ }
+ }
+ }
- /**
+ /**
* Execute on the thread.
*/
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName() +
- " started on thread " + Thread.currentThread().getName());
- }
+ protected void doRun ()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
- while (isRunning())
+ while (isRunning())
+ {
+ javax.jms.Message msgIn = receiveOne();
+ if (null != msgIn)
+ {
+ try
{
- javax.jms.Message msgIn = receiveOne();
- if (null != msgIn) {
- try {
- Object obj = _processMethod.invoke(_composer,
- new Object[]{msgIn});
- if (null == obj) {
- _logger.warn("Action class method <" + _processMethod
- .getName() + "> returned a null object");
- continue;
- }
- // try to deliverAsync the composed message, using the
- // appropriate courier
- // to the target service
- try {
- boolean bSent = false;
- for (EPR current : _targetEprs) {
- _courier = CourierFactory.getCourier(current);
- try {
- if (_courier
- .deliver((Message) obj)) {
- bSent = true;
- break;
- }
- }
- finally {
- CourierUtil.cleanCourier(_courier);
- }
- }
- if (!bSent) {
- String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
- throw new Exception(text);
- }
- }
- catch (ClassCastException e) {
- _logger.error("Action class method <" + _processMethod
- .getName() + "> returned a non Message object",
- e);
- continue;
- }
- catch (CourierException e) {
- String text = (null != _courier) ? "Courier <" + _courier
- .getClass().getName() + ".deliverAsync(Message) FAILED" : "NULL courier can't deliverAsync Message";
- _logger.error(text, e);
- continue;
- }
- continue;
- }
- catch (InvocationTargetException e) {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- }
- catch (IllegalAccessException e) {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- }
- catch (Exception e) {
- _logger.error("Unexpected problem", e);
- }
- }
- }
+ Object obj = _processMethod.invoke(_composer, new Object[]
+ { msgIn });
+ if (null == obj)
+ {
+ _logger.warn("Action class method <"
+ + _processMethod.getName()
+ + "> returned a null object");
+ continue;
+ }
+ // try to deliverAsync the composed message, using the
+ // appropriate courier
+ // to the target service
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(Environment.GATEWAY_CONFIG, _config);
+
+ obj = FilterManager.getInstance().doOutputWork((Message) obj, params);
+
+ try
+ {
+ boolean bSent = false;
+ for (EPR current : _targetEprs)
+ {
+ _courier = CourierFactory.getCourier(current);
+ try
+ {
+ if (_courier.deliver((Message) obj))
+ {
+ bSent = true;
+ break;
+ }
+ }
+ finally
+ {
+ CourierUtil.cleanCourier(_courier);
+ }
+ }
+ if (!bSent)
+ {
+ String text = "Target service <"
+ + _targetServiceCategory + ","
+ + _targetServiceName
+ + "> is not registered";
+ throw new Exception(text);
+ }
+ }
+ catch (ClassCastException e)
+ {
+ _logger.error("Action class method <"
+ + _processMethod.getName()
+ + "> returned a non Message object", e);
+ continue;
+ }
+ catch (CourierException e)
+ {
+ String text = (null != _courier) ? "Courier <"
+ + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED"
+ : "NULL courier can't deliverAsync Message";
+ _logger.error(text, e);
+ continue;
+ }
+ continue;
+ }
+ catch (InvocationTargetException e)
+ {
+ _logger.error("Problems invoking method <"
+ + _processMethod.getName() + ">", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ _logger.error("Problems invoking method <"
+ + _processMethod.getName() + ">", e);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected problem", e);
+ }
+ }
+ }
- _logger
- .debug("run() method of " + this.getClass().getSimpleName() + " finished on thread " + Thread
- .currentThread().getName());
- } // ________________________________
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread " + Thread.currentThread().getName());
+ } // ________________________________
- /**
+ /**
* Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
+ *
+ * @throws ManagedLifecycleException
+ * for errors while destroying.
*/
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
- }
+ protected void doDestroy () throws ManagedLifecycleException
+ {
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+ }
- if (_messageReceiver != null)
- {
- try
- {
- _messageReceiver.close();
- }
- catch (final JMSException jmse) {} // ignore
- }
+ if (_messageReceiver != null)
+ {
+ try
+ {
+ _messageReceiver.close();
+ }
+ catch (final JMSException jmse)
+ {
+ } // ignore
+ }
- if (_queueSession != null)
- {
- _pool.closeSession(_queueSession);
- }
- }
-
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms () throws ConfigurationException
+ if (_queueSession != null)
{
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ _pool.closeSession(_queueSession);
+ }
+ }
- _queueName = ListenerUtil.obtainAtt(_config,
- JMSEpr.DESTINATION_NAME_TAG, null);
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms () throws ConfigurationException
+ {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- resolveComposerClass();
+ _queueName = ListenerUtil.obtainAtt(_config,
+ JMSEpr.DESTINATION_NAME_TAG, null);
- // No problem if selector is null - everything in queue will be returned
- _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+ resolveComposerClass();
+
+ // No problem if selector is null - everything in queue will be returned
+ _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+ _logger.debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG
+ + " - All messages in queue will be received by this listener");
+ } // ________________________________
+
+ protected void resolveComposerClass () throws ConfigurationException
+ {
+ try
+ {
+ String sProcessMethod = null;
+ _composerName = _config
+ .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName)
+ { // class attribute
+ _composerClass = ClassUtil.forName(_composerName, getClass());
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ { ConfigTree.class });
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config
+ .getAttribute(
+ ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+ "process");
+ }
+ else
+ {
+ _composerName = PackageJmsMessageContents.class.getName();
+ _composerClass = PackageJmsMessageContents.class;
+ _composer = new PackageJmsMessageContents();
+ sProcessMethod = "process";
_logger
- .debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG + " - All messages in queue will be received by this listener");
- } // ________________________________
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
+ + "> element found in configuration"
+ + " - Using default composer class : "
+ + _composerName);
+ }
- protected void resolveComposerClass () throws ConfigurationException
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]
+ { Object.class });
+ }
+ catch (Exception ex)
{
- try
- {
- String sProcessMethod = null;
- _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
- }
- else
- {
- _composerName = PackageJmsMessageContents.class.getName();
- _composerClass = PackageJmsMessageContents.class;
- _composer = new PackageJmsMessageContents();
- sProcessMethod = "process";
- _logger
- .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
- }
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[] { Object.class });
- }
- catch (Exception ex)
- {
- throw new ConfigurationException(ex);
- }
- } // ________________________________
+ private void prepareMessageReceiver () throws ConfigurationException,
+ JMSException, ConnectionException
+ {
+ _queueSession = null;
+ _queue = null;
- private void prepareMessageReceiver () throws ConfigurationException, JMSException, ConnectionException
+ Properties environment = new Properties();
+
+ String sJndiURL = _config.getAttribute(JMSEpr.JNDI_URL_TAG);
+ String sJndiContextFactory = _config
+ .getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
+ String sJndiPkgPrefix = _config
+ .getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
+ if (sJndiURL != null)
+ environment.setProperty(Context.PROVIDER_URL, sJndiURL);
+ if (sJndiContextFactory != null)
+ environment.setProperty(Context.INITIAL_CONTEXT_FACTORY,
+ sJndiContextFactory);
+ if (sJndiPkgPrefix != null)
+ environment.setProperty(Context.URL_PKG_PREFIXES, sJndiPkgPrefix);
+ Set<String> names = _config.getAttributeNames();
+ for (String name : names)
{
- _queueSession = null;
- _queue = null;
+ if (name.startsWith("java.naming."))
+ {
+ environment.setProperty(name, _config.getAttribute(name));
+ }
+ }
+ Context oJndiCtx = NamingContext.getServerContext(environment);
- Properties environment = new Properties();
+ if (null == oJndiCtx)
+ throw new ConfigurationException("Unable fo obtain jndi context <"
+ + sJndiURL + "," + sJndiContextFactory + ","
+ + sJndiPkgPrefix + ">");
- String sJndiURL = _config.getAttribute(JMSEpr.JNDI_URL_TAG);
- String sJndiContextFactory = _config.getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
- String sJndiPkgPrefix = _config.getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
- if (sJndiURL!=null) environment.setProperty(Context.PROVIDER_URL, sJndiURL);
- if (sJndiContextFactory!=null) environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, sJndiContextFactory);
- if (sJndiPkgPrefix!=null) environment.setProperty(Context.URL_PKG_PREFIXES, sJndiPkgPrefix);
- Set<String> names=_config.getAttributeNames();
- for (String name : names) {
- if (name.startsWith("java.naming.")) {
- environment.setProperty(name, _config.getAttribute(name));
- }
- }
- Context oJndiCtx = NamingContext.getServerContext(environment);
+ String sFactClass = ListenerUtil.obtainAtt(_config,
+ JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
+ _logger.debug("No value specified for "
+ + JMSEpr.CONNECTION_FACTORY_TAG + " attribute"
+ + " - Using default of: '" + sFactClass + "'");
+ _serviceCategory = _config
+ .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
+ _queueName, sFactClass, environment, _messageSelector);
- if (null == oJndiCtx)
- throw new ConfigurationException(
- "Unable fo obtain jndi context <" + sJndiURL + "," + sJndiContextFactory + "," + sJndiPkgPrefix + ">");
+ _pool = JmsConnectionPoolContainer.getPool(environment, sFactClass,
+ JMSEpr.QUEUE_TYPE);
- String sFactClass = ListenerUtil.obtainAtt(_config,
- JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
- if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.CONNECTION_FACTORY_TAG + " attribute" + " - Using default of: '" + sFactClass + "'");
- _serviceCategory = _config
- .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
- _queueName, sFactClass, environment, _messageSelector);
+ try
+ {
+ _queueSession = _pool.getQueueSession();
+ }
+ catch (NamingException ne)
+ {
+ throw new ConfigurationException(
+ "Failed to obtain queue session from pool", ne);
+ }
- _pool = JmsConnectionPoolContainer.getPool(environment, sFactClass, JMSEpr.QUEUE_TYPE);
+ try
+ {
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ }
+ catch (NamingException nex)
+ {
+ try
+ {
+ oJndiCtx = NamingContext.getServerContext(environment);
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ }
+ catch (NamingException ne)
+ {
+ _queue = _queueSession.createQueue(_queueName);
+ }
+ }
- try {
- _queueSession = _pool.getQueueSession();
- } catch (NamingException ne) {
- throw new ConfigurationException("Failed to obtain queue session from pool", ne) ;
- }
+ _messageReceiver = _queueSession.createReceiver(_queue,
+ _messageSelector);
+ } // ________________________________
- try {
- _queue = (Queue) oJndiCtx.lookup(_queueName);
- } catch (NamingException nex) {
- try {
- oJndiCtx = NamingContext.getServerContext(environment);
- _queue = (Queue) oJndiCtx.lookup(_queueName);
- } catch (NamingException ne) {
- _queue = _queueSession.createQueue(_queueName);
- }
- }
+ /**
+ * Receive one message and retry if connection
+ *
+ * @return javax.jms.Message - One input message, or null
+ */
+ protected javax.jms.Message receiveOne ()
+ {
+ while (isRunning())
+ try
+ {
+ javax.jms.Message ret = _messageReceiver.receive(200);
+ if (null != ret)
+ return ret;
+ }
+ catch (JMSException oJ)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger
+ .debug(
+ "JMS error on receive. Attempting JMS Destination reconnect.",
+ oJ);
+ }
+ try
+ {
+ prepareMessageReceiver();
+ errorDelay = 0;
+ }
+ // try to reconnect to the queue
+ catch (Exception e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Reconnecting to Queue", e);
+ }
+ if (errorDelay == 0)
+ {
+ errorDelay = MIN_ERROR_DELAY;
+ }
+ else if (errorDelay < MAX_ERROR_DELAY)
+ {
+ errorDelay <<= 1;
+ }
+ _logger
+ .warn("Error reconnecting to Queue, backing off for "
+ + errorDelay + " milliseconds");
+ waitForRunningStateChange(
+ ManagedLifecycleThreadState.STOPPING, errorDelay);
+ }
+ }
+ return null;
+ } // ________________________________
- _messageReceiver = _queueSession.createReceiver(_queue,
- _messageSelector);
- } // ________________________________
+ protected final static Logger _logger = Logger
+ .getLogger(JmsGatewayListener.class);
- /**
- * Receive one message and retry if connection
- *
- * @return javax.jms.Message - One input message, or null
- */
- protected javax.jms.Message receiveOne ()
- {
- while (isRunning())
- try
- {
- javax.jms.Message ret = _messageReceiver.receive(200);
- if (null != ret) return ret;
- }
- catch (JMSException oJ)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("JMS error on receive. Attempting JMS Destination reconnect.",oJ);
- }
- try
- {
- prepareMessageReceiver();
- errorDelay = 0 ;
- }
- // try to reconnect to the queue
- catch (Exception e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconnecting to Queue", e);
- }
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY ;
- }
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1 ;
- }
- _logger.warn("Error reconnecting to Queue, backing off for " + errorDelay + " milliseconds") ;
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
- }
- }
- return null;
- } // ________________________________
+ protected String _queueName;
- protected final static Logger _logger = Logger
- .getLogger(JmsGatewayListener.class);
+ protected QueueSession _queueSession;
- protected String _queueName;
+ protected Queue _queue;
- protected QueueSession _queueSession;
+ protected MessageConsumer _messageReceiver;
- protected Queue _queue;
+ protected String _messageSelector;
- protected MessageConsumer _messageReceiver;
+ protected ConfigTree _config;
- protected String _messageSelector;
+ protected String _serviceCategory, _serviceName;
- protected ConfigTree _config;
+ protected String _targetServiceCategory, _targetServiceName;
- protected String _serviceCategory, _serviceName;
+ protected EPR _myEpr;
- protected String _targetServiceCategory, _targetServiceName;
+ protected Collection<EPR> _targetEprs;
- protected EPR _myEpr;
+ protected String _composerName;
- protected Collection<EPR> _targetEprs;
+ protected Class _composerClass;
- protected String _composerName;
+ protected Object _composer;
- protected Class _composerClass;
+ protected Method _processMethod;
- protected Object _composer;
+ protected Courier _courier;
- protected Method _processMethod;
+ protected JmsConnectionPool _pool;
- protected Courier _courier;
-
- protected JmsConnectionPool _pool;
/**
- * The minimum error delay.
- */
- private static final long MIN_ERROR_DELAY = 1000 ;
+ * The minimum error delay.
+ */
+ private static final long MIN_ERROR_DELAY = 1000;
+
/**
- * The maximum error delay.
- */
- private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5) ;
-
+ * The maximum error delay.
+ */
+ private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5);
+
/**
- * The error delay.
- */
- private long errorDelay ;
+ * The error delay.
+ */
+ private long errorDelay;
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -46,10 +46,12 @@
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.filter.FilterManager;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
@@ -86,742 +88,781 @@
public class SqlTableGatewayListener extends AbstractThreadedManagedLifecycle
{
- /**
+ /**
* serial version uid for this class
*/
- private static final long serialVersionUID = -4394272471377134121L;
-
- public SqlTableGatewayListener(ConfigTree config)
- throws ConfigurationException
- {
- super(config) ;
- _config = config;
- _sleepBetweenPolls = 10000; // milliseconds TODO magic number
- checkMyParms();
- } // __________________________________
-
- /**
+ private static final long serialVersionUID = -4394272471377134121L;
+
+ public SqlTableGatewayListener(ConfigTree config)
+ throws ConfigurationException
+ {
+ super(config);
+ _config = config;
+ _sleepBetweenPolls = 10000; // milliseconds TODO magic number
+ checkMyParms();
+ } // __________________________________
+
+ /**
* Handle the initialisation of the managed instance.
*
- * @throws ManagedLifecycleException for errors while initialisation.
+ * @throws ManagedLifecycleException
+ * for errors while initialisation.
*/
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- try
- {
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected registry exception", re) ;
- }
-
- boolean failure = true ;
- try
- {
- prepareStatements();
- failure = false ;
- }
- catch (final SQLException sqle)
- {
- throw new ManagedLifecycleException("Unexpected error initialising statements", sqle);
- }
- finally
- {
- if (failure)
- {
- if (_dbConn != null)
- {
- _dbConn.release();
- _dbConn = null ;
- }
- }
- }
- }
+ protected void doInitialise () throws ManagedLifecycleException
+ {
+ try
+ {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+ _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <"
+ + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected registry exception", re);
+ }
- /**
- * Execute on the thread.
- */
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("doRun() method of " + this.getClass().getSimpleName() +
- " started on thread " + Thread.currentThread().getName());
- }
-
- do
+ boolean failure = true;
+ try
+ {
+ prepareStatements();
+ failure = false;
+ }
+ catch (final SQLException sqle)
+ {
+ throw new ManagedLifecycleException(
+ "Unexpected error initialising statements", sqle);
+ }
+ finally
+ {
+ if (failure)
+ {
+ if (_dbConn != null)
{
- for (Map<String, Object> row : pollForCandidates())
- {
- _currentRow = row;
- // Try to mark as 'in process' - if unsuccessful, somebody else
- // got it first
- if (!changeStatusToWorking()) continue;
-
- Throwable thrown = null;
- String text = null;
- try
- {
- Object obj = _processMethod.invoke(_composer,
- new Object[] { _currentRow });
- if (null == obj)
- {
- _logger.warn("Action class method <" + _processMethod
- .getName() + "> returned a null object");
- continue;
- }
- Message message = (Message) obj;
-
- // if(_composerClass.equals(PackageRowContents.class))
- // {
- // Properties props = message.getProperties();
- // props.setProperty(JDBCEpr.DRIVER_TAG ,_driver);
- // props.setProperty(JDBCEpr.URL_TAG ,_url);
- // props.setProperty(JDBCEpr.USERNAME_TAG ,_user);
- // props.setProperty(JDBCEpr.PASSWORD_TAG , _password);
- //
- // }
- boolean bSent = false;
- for (EPR current : _targetEprs)
- {
- _courier = CourierFactory.getCourier(current);
- try
- {
- if (_courier.deliver(message))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier) ;
- }
- }
- if (!bSent)
- {
- text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
- thrown = new Exception(text);
- }
- }
- catch (InvocationTargetException e)
- {
- thrown = e;
- text = "Problems invoking method <" + _processMethod
- .getName() + ">";
- }
- catch (IllegalAccessException e)
- {
- thrown = e;
- text = "Problems invoking method <" + _processMethod
- .getName() + ">";
- }
- catch (ClassCastException e)
- {
- thrown = e;
- text = "Action class method <" + _processMethod.getName() + "> returned a non Message object";
- }
- catch (CourierException e)
- {
- thrown = e;
- text = "Courier <" + _courier.getClass().getName() + ".deliverAsync(Message) FAILED";
- }
- catch (MalformedEPRException ex)
- {
- thrown = ex;
- text = "Courier <" + _courier.getClass().getName() + ".deliverAsync(Message) FAILED with malformed EPR.";
- }
-
- if (null == thrown)
- {
- if (_deleteAfterOK) deleteCurrentRow();
- else
- changeStatusToDone();
- }
- else
- {
- _logger.error(text);
- _logger.debug(text, thrown);
- changeStatusToError();
- }
- }
+ _dbConn.release();
+ _dbConn = null;
}
- while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepBetweenPolls)) ;
+ }
+ }
+ }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName() +
- " finished on thread " + Thread.currentThread().getName());
- }
- } // ________________________________
-
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
+ /**
+ * Execute on the thread.
*/
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- if (_dbConn != null)
- {
- _dbConn.release();
- }
- }
+ protected void doRun ()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("doRun() method of "
+ + this.getClass().getSimpleName() + " started on thread "
+ + Thread.currentThread().getName());
+ }
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- private void checkMyParms () throws ConfigurationException
+ do
{
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ for (Map<String, Object> row : pollForCandidates())
+ {
+ _currentRow = row;
+ // Try to mark as 'in process' - if unsuccessful, somebody else
+ // got it first
+ if (!changeStatusToWorking())
+ continue;
- // Polling interval
- String sAux = _config
- .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
-
- if (!Util.isNullString(sAux))
+ Throwable thrown = null;
+ String text = null;
+ try
{
+ Object obj = _processMethod.invoke(_composer, new Object[]
+ { _currentRow });
+ if (null == obj)
+ {
+ _logger.warn("Action class method <"
+ + _processMethod.getName()
+ + "> returned a null object");
+ continue;
+ }
+ Message message = (Message) obj;
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(Environment.GATEWAY_CONFIG, _config);
+
+ message = FilterManager.getInstance().doOutputWork(message, params);
+
+ boolean bSent = false;
+ for (EPR current : _targetEprs)
+ {
+ _courier = CourierFactory.getCourier(current);
try
{
- _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+ if (_courier.deliver(message))
+ {
+ bSent = true;
+ break;
+ }
}
- catch (NumberFormatException e)
+ finally
{
- _logger
- .warn("Invalid poll latency - keeping default of " + (_sleepBetweenPolls / 1000));
+ CourierUtil.cleanCourier(_courier);
}
+ }
+ if (!bSent)
+ {
+ text = "Target service <" + _targetServiceCategory
+ + "," + _targetServiceName
+ + "> is not registered";
+ thrown = new Exception(text);
+ }
}
+ catch (InvocationTargetException e)
+ {
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
+ }
+ catch (IllegalAccessException e)
+ {
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
+ }
+ catch (ClassCastException e)
+ {
+ thrown = e;
+ text = "Action class method <" + _processMethod.getName()
+ + "> returned a non Message object";
+ }
+ catch (CourierException e)
+ {
+ thrown = e;
+ text = "Courier <" + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED";
+ }
+ catch (MalformedEPRException ex)
+ {
+ thrown = ex;
+ text = "Courier <"
+ + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED with malformed EPR.";
+ }
+
+ if (null == thrown)
+ {
+ if (_deleteAfterOK)
+ deleteCurrentRow();
+ else
+ changeStatusToDone();
+ }
else
{
- _logger
- .warn("No value specified for: " + ListenerTagNames.POLL_LATENCY_SECS_TAG + " - Using default of " + (_sleepBetweenPolls / 1000));
+ _logger.error(text);
+ _logger.debug(text, thrown);
+ changeStatusToError();
}
+ }
+ }
+ while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
+ _sleepBetweenPolls));
- resolveComposerClass();
+ if (_logger.isDebugEnabled())
+ {
+ _logger
+ .debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread "
+ + Thread.currentThread().getName());
+ }
+ } // ________________________________
- _driver = ListenerUtil.obtainAtt(_config, JDBCEpr.DRIVER_TAG, null);
- _url = ListenerUtil.obtainAtt(_config, JDBCEpr.URL_TAG, null);
- _user = ListenerUtil.obtainAtt(_config, JDBCEpr.USERNAME_TAG, null);
- _password = ListenerUtil.obtainAtt(_config, JDBCEpr.PASSWORD_TAG, "");
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException
+ * for errors while destroying.
+ */
+ protected void doDestroy () throws ManagedLifecycleException
+ {
+ if (_dbConn != null)
+ {
+ _dbConn.release();
+ }
+ }
- _tableName = _config.getAttribute(ListenerTagNames.SQL_TABLE_NAME_TAG);
- if (null == _tableName)
- _tableName = _config.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG);
- if (Util.isNullString(_tableName))
- throw new ConfigurationException("Empty or invalid table name");
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ private void checkMyParms () throws ConfigurationException
+ {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- _selectFields = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_SELECT_FIELDS_TAG, "*");
- if (Util.isNullString(_selectFields))
- throw new ConfigurationException(
- "Empty or invalid list of select fields");
- _keyFields = _config.getAttribute(ListenerTagNames.SQL_KEY_FIELDS_TAG);
- if (null == _keyFields)
- _keyFields = _config.getRequiredAttribute(JDBCEpr.MESSAGE_ID_COLUMN_TAG);
- if (Util.isNullString(_keyFields))
- throw new ConfigurationException(
- "Empty or invalid list of key fields");
- _inProcessField = _config
- .getAttribute(ListenerTagNames.SQL_IN_PROCESS_FIELD_TAG);
- if (null == _inProcessField)
- _inProcessField = _config.getAttribute(JDBCEpr.STATUS_COLUMN_TAG);
- if (Util.isNullString(_inProcessField))
- throw new ConfigurationException(
- "A valid inProcessField attribute must be specified");
+ // Polling interval
+ String sAux = _config
+ .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- _where = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_WHERE_CONDITION_TAG, "");
- if (_where.trim().length() < 1)
- _logger
- .debug("No value specified for: " + ListenerTagNames.SQL_WHERE_CONDITION_TAG);
- _orderBy = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_ORDER_BY_TAG, "");
- if (_orderBy.trim().length() < 1)
- _logger
- .debug("No value specified for: " + ListenerTagNames.SQL_ORDER_BY_TAG);
- _inProcessVals = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG,
- DEFAULT_IN_PROCESS_STATES);
+ if (!Util.isNullString(sAux))
+ {
+ try
+ {
+ _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Invalid poll latency - keeping default of "
+ + (_sleepBetweenPolls / 1000));
+ }
+ }
+ else
+ {
+ _logger.warn("No value specified for: "
+ + ListenerTagNames.POLL_LATENCY_SECS_TAG
+ + " - Using default of " + (_sleepBetweenPolls / 1000));
+ }
- _deleteAfterOK = Boolean.parseBoolean(ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_POST_DEL_TAG, "false"));
- if (null == _config.getAttribute(ListenerTagNames.SQL_POST_DEL_TAG))
- _logger
- .debug("No value specified for: " + ListenerTagNames.SQL_POST_DEL_TAG + " - trigger row will not be deleted - 'in process field' will be used to show processing status");
+ resolveComposerClass();
- if (_inProcessVals.length() < 4)
- throw new ConfigurationException(
- "Parameter <" + ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG + "> must be at least 4 characters long (PWED)");
+ _driver = ListenerUtil.obtainAtt(_config, JDBCEpr.DRIVER_TAG, null);
+ _url = ListenerUtil.obtainAtt(_config, JDBCEpr.URL_TAG, null);
+ _user = ListenerUtil.obtainAtt(_config, JDBCEpr.USERNAME_TAG, null);
+ _password = ListenerUtil.obtainAtt(_config, JDBCEpr.PASSWORD_TAG, "");
- _columns = _selectFields.split(",");
- if (_columns.length < 1)
- throw new ConfigurationException("Empty list of select fields");
+ _tableName = _config.getAttribute(ListenerTagNames.SQL_TABLE_NAME_TAG);
+ if (null == _tableName)
+ _tableName = _config.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG);
+ if (Util.isNullString(_tableName))
+ throw new ConfigurationException("Empty or invalid table name");
- _keys = _keyFields.split(",");
- if (!"*".equals(_selectFields))
- {
- Set<String> colSet = new HashSet<String>(Arrays.asList(_columns));
- if (_keys.length < 1)
- throw new ConfigurationException("Empty list of keyFields");
- for (String currKey : _keys)
- {
- if (colSet.contains(currKey)) continue;
- else
- {
- StringBuilder sb = new StringBuilder().append(
- "All key field names in the <").append(
- ListenerTagNames.SQL_KEY_FIELDS_TAG).append(
- "> attribute must be in the ").append(
- ListenerTagNames.SQL_SELECT_FIELDS_TAG).append(
- "list - '").append(currKey)
- .append("' is not there");
- throw new ConfigurationException(sb.toString());
- }
- }
- }
- } // ________________________________
+ _selectFields = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_SELECT_FIELDS_TAG, "*");
+ if (Util.isNullString(_selectFields))
+ throw new ConfigurationException(
+ "Empty or invalid list of select fields");
+ _keyFields = _config.getAttribute(ListenerTagNames.SQL_KEY_FIELDS_TAG);
+ if (null == _keyFields)
+ _keyFields = _config
+ .getRequiredAttribute(JDBCEpr.MESSAGE_ID_COLUMN_TAG);
+ if (Util.isNullString(_keyFields))
+ throw new ConfigurationException(
+ "Empty or invalid list of key fields");
+ _inProcessField = _config
+ .getAttribute(ListenerTagNames.SQL_IN_PROCESS_FIELD_TAG);
+ if (null == _inProcessField)
+ _inProcessField = _config.getAttribute(JDBCEpr.STATUS_COLUMN_TAG);
+ if (Util.isNullString(_inProcessField))
+ throw new ConfigurationException(
+ "A valid inProcessField attribute must be specified");
- protected void prepareStatements () throws SQLException
- {
- _PSscan = getDbConn().prepareStatement(scanStatement());
- _PSupdate = getDbConn().prepareStatement(updateStatement());
- _PSdeleteRow = getDbConn().prepareStatement(deleteStatement());
- } // ________________________________
+ _where = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_WHERE_CONDITION_TAG, "");
+ if (_where.trim().length() < 1)
+ _logger.debug("No value specified for: "
+ + ListenerTagNames.SQL_WHERE_CONDITION_TAG);
+ _orderBy = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_ORDER_BY_TAG, "");
+ if (_orderBy.trim().length() < 1)
+ _logger.debug("No value specified for: "
+ + ListenerTagNames.SQL_ORDER_BY_TAG);
+ _inProcessVals = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG,
+ DEFAULT_IN_PROCESS_STATES);
- /*
- * Throw ConfigurationException for anything to do with setup. Ultimately
- * could do with finer grained error handling. Probably need different types
- * of setup exceptions.
- */
-
- protected void resolveComposerClass () throws ConfigurationException
+ _deleteAfterOK = Boolean.parseBoolean(ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_POST_DEL_TAG, "false"));
+ if (null == _config.getAttribute(ListenerTagNames.SQL_POST_DEL_TAG))
+ _logger
+ .debug("No value specified for: "
+ + ListenerTagNames.SQL_POST_DEL_TAG
+ + " - trigger row will not be deleted - 'in process field' will be used to show processing status");
+
+ if (_inProcessVals.length() < 4)
+ throw new ConfigurationException("Parameter <"
+ + ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG
+ + "> must be at least 4 characters long (PWED)");
+
+ _columns = _selectFields.split(",");
+ if (_columns.length < 1)
+ throw new ConfigurationException("Empty list of select fields");
+
+ _keys = _keyFields.split(",");
+ if (!"*".equals(_selectFields))
{
- try
+ Set<String> colSet = new HashSet<String>(Arrays.asList(_columns));
+ if (_keys.length < 1)
+ throw new ConfigurationException("Empty list of keyFields");
+ for (String currKey : _keys)
+ {
+ if (colSet.contains(currKey))
+ continue;
+ else
{
- String sProcessMethod = null;
- _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
- }
- else
- {
- _composerName = PackageRowContents.class.getName();
- _composerClass = PackageRowContents.class;
- _composer = new PackageRowContents();
- sProcessMethod = "process";
- _logger
- .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
- }
-
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[] { Object.class });
+ StringBuilder sb = new StringBuilder().append(
+ "All key field names in the <").append(
+ ListenerTagNames.SQL_KEY_FIELDS_TAG).append(
+ "> attribute must be in the ").append(
+ ListenerTagNames.SQL_SELECT_FIELDS_TAG).append(
+ "list - '").append(currKey)
+ .append("' is not there");
+ throw new ConfigurationException(sb.toString());
}
- catch (InvocationTargetException ex)
- {
- _logger.debug(ex);
-
- throw new ConfigurationException(ex);
- }
- catch (IllegalAccessException ex)
- {
- _logger.debug(ex);
-
- throw new ConfigurationException(ex);
- }
- catch (InstantiationException ex)
- {
- _logger.debug(ex);
-
- throw new ConfigurationException(ex);
- }
- catch (ClassNotFoundException ex)
- {
- _logger.debug(ex);
-
- throw new ConfigurationException(ex);
- }
- catch (NoSuchMethodException ex)
- {
- _logger.debug(ex);
-
- throw new ConfigurationException(ex);
- }
- } // ________________________________
+ }
+ }
+ } // ________________________________
- protected List<Map<String, Object>> pollForCandidates ()
+ protected void prepareStatements () throws SQLException
+ {
+ _PSscan = getDbConn().prepareStatement(scanStatement());
+ _PSupdate = getDbConn().prepareStatement(updateStatement());
+ _PSdeleteRow = getDbConn().prepareStatement(deleteStatement());
+ } // ________________________________
+
+ /*
+ * Throw ConfigurationException for anything to do with setup.
+ * Ultimately could do with finer grained error handling. Probably need
+ * different types of setup exceptions.
+ */
+
+ protected void resolveComposerClass () throws ConfigurationException
+ {
+ try
{
- List<Map<String, Object>> oResults = new ArrayList<Map<String, Object>>();
- final JdbcCleanConn oConn = getDbConn();
- try
- {
- ResultSet RS = oConn.execQueryWait(_PSscan, 1);
- ResultSetMetaData meta = RS.getMetaData();
- while (RS.next())
- {
- Map<String, Object> row = new HashMap<String, Object>();
- for (int iCurr = 1; iCurr <= meta.getColumnCount(); iCurr++)
- {
- String sCol = meta.getColumnName(iCurr);
- if (!_inProcessField.equals(sCol))
- row.put(sCol, RS.getObject(iCurr));
- }
+ String sProcessMethod = null;
+ _composerName = _config
+ .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName)
+ { // class attribute
+ _composerClass = ClassUtil.forName(_composerName, getClass());
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ { ConfigTree.class });
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config
+ .getAttribute(
+ ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+ "process");
+ }
+ else
+ {
+ _composerName = PackageRowContents.class.getName();
+ _composerClass = PackageRowContents.class;
+ _composer = new PackageRowContents();
+ sProcessMethod = "process";
+ _logger
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
+ + "> element found in configuration"
+ + " - Using default composer class : "
+ + _composerName);
+ }
- oResults.add(row);
- }
- }
- catch (Exception e)
- {
- _logger.debug("Some triggers might not have been returned", e);
- }
- finally
- {
- try
- {
- oConn.rollback() ;
- }
- catch (final SQLException sqle) {}
- }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Returning " + oResults.size() + " rows.\n");
- }
- return oResults;
- } // ________________________________
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]
+ { Object.class });
+ }
+ catch (InvocationTargetException ex)
+ {
+ _logger.debug(ex);
- /**
- * Obtain a new database connection with parameter info
- *
- * @return A new connection
- * @throws ConfigurationException -
- * if problems are encountered
- */
- protected JdbcCleanConn getDbConn()
+ throw new ConfigurationException(ex);
+ }
+ catch (IllegalAccessException ex)
{
- if (null == _dbConn)
- {
- DataSource oDS = new SimpleDataSource(_driver, _url, _user,
- _password);
- _dbConn = new JdbcCleanConn(oDS);
- }
- return _dbConn;
- } // ________________________________
+ _logger.debug(ex);
- /**
- * Assemble the SQL statement to scan (poll) the table
- *
- * @return - The resulting SQL statement
- */
- protected String scanStatement ()
+ throw new ConfigurationException(ex);
+ }
+ catch (InstantiationException ex)
{
- StringBuilder sb = new StringBuilder().append("select ").append(
- _selectFields).append(" from ").append(_tableName);
+ _logger.debug(ex);
- boolean bWhere = !Util.isNullString(_where);
- if (bWhere) sb.append(" where ").append(_where);
- sb.append((bWhere) ? " and " : " where ");
+ throw new ConfigurationException(ex);
+ }
+ catch (ClassNotFoundException ex)
+ {
+ _logger.debug(ex);
- String sLike = _inProcessVals.substring(0, 1).toUpperCase();
- sb.append(" upper(").append(_inProcessField).append(") like '").append(
- sLike).append("%'");
+ throw new ConfigurationException(ex);
+ }
+ catch (NoSuchMethodException ex)
+ {
+ _logger.debug(ex);
- if (!Util.isNullString(_orderBy))
- sb.append(" order by ").append(_orderBy);
- return sb.toString();
- } // ________________________________
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
- /**
- * Assemble the SQL statement to update the field in the "inProcessField"
- * parameter
- *
- * in the table row uniquely identified by the list of fields in the
- * "keyFields" parameter
- *
- * @return - The resulting SQL statement
- */
- protected String updateStatement ()
+ protected List<Map<String, Object>> pollForCandidates ()
+ {
+ List<Map<String, Object>> oResults = new ArrayList<Map<String, Object>>();
+ final JdbcCleanConn oConn = getDbConn();
+ try
{
- StringBuilder sb = new StringBuilder().append("update ").append(
- _tableName).append(" set ").append(_inProcessField).append(
- " = ? where ").append(_inProcessField).append(" = ?");
- for (String sCurr : _keys)
+ ResultSet RS = oConn.execQueryWait(_PSscan, 1);
+ ResultSetMetaData meta = RS.getMetaData();
+ while (RS.next())
+ {
+ Map<String, Object> row = new HashMap<String, Object>();
+ for (int iCurr = 1; iCurr <= meta.getColumnCount(); iCurr++)
{
- sb.append(" and ").append(sCurr).append(" = ?");
+ String sCol = meta.getColumnName(iCurr);
+ if (!_inProcessField.equals(sCol))
+ row.put(sCol, RS.getObject(iCurr));
}
- return sb.toString();
- } // ________________________________
- /**
- * Assemble the SQL "select for update" statement for the "inProcessField"
- * parameter
- *
- * in the table row uniquely identified by the list of fields in the
- * "keyFields" parameter
- *
- * @return - The resulting SQL statement
- */
- protected String selectForUpdStatement ()
+ oResults.add(row);
+ }
+ }
+ catch (Exception e)
{
- StringBuilder sb = new StringBuilder().append("select ").append(
- _inProcessField).append(" from ").append(_tableName).append(
- " where ");
- int iCurr = 0;
- for (String sCurr : _keys)
- {
- if (iCurr++ > 0) sb.append(" and ");
- sb.append(sCurr).append(" = ?");
- }
-
- /*
- * HS QL does not support FOR UPDATE! All tables appear to
- * be inherently updatable!
- */
-
- if (_driver.contains("hsqldb"))
- return sb.toString();
- else
- return sb.append(" for update").toString();
- } // ________________________________
+ _logger.debug("Some triggers might not have been returned", e);
+ }
+ finally
+ {
+ try
+ {
+ oConn.rollback();
+ }
+ catch (final SQLException sqle)
+ {
+ }
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Returning " + oResults.size() + " rows.\n");
+ }
+ return oResults;
+ } // ________________________________
- /**
- * Assemble the SQL statement to delete the current row in the table row
- * uniquely identified by the list of fields in the "keyFields" parameter
- *
- * @return - The resulting SQL statement
- */
- protected String deleteStatement ()
+ /**
+ * Obtain a new database connection with parameter info
+ *
+ * @return A new connection
+ * @throws ConfigurationException -
+ * if problems are encountered
+ */
+ protected JdbcCleanConn getDbConn ()
+ {
+ if (null == _dbConn)
{
- StringBuilder sb = new StringBuilder().append("delete from ").append(
- _tableName).append(" where ");
- int iCurr = 0;
- for (String sCurr : _keys)
- {
- if (iCurr++ > 0) sb.append(" and ");
- sb.append(sCurr).append(" = ?");
- }
- return sb.toString();
- } // ________________________________
+ DataSource oDS = new SimpleDataSource(_driver, _url, _user,
+ _password);
+ _dbConn = new JdbcCleanConn(oDS);
+ }
+ return _dbConn;
+ } // ________________________________
- /**
- * Try to delete 'current row' from polled table
- *
- * @return true if row deletion was successful - false otherwise
- */
- protected boolean deleteCurrentRow ()
+ /**
+ * Assemble the SQL statement to scan (poll) the table
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String scanStatement ()
+ {
+ StringBuilder sb = new StringBuilder().append("select ").append(
+ _selectFields).append(" from ").append(_tableName);
+
+ boolean bWhere = !Util.isNullString(_where);
+ if (bWhere)
+ sb.append(" where ").append(_where);
+ sb.append((bWhere) ? " and " : " where ");
+
+ String sLike = _inProcessVals.substring(0, 1).toUpperCase();
+ sb.append(" upper(").append(_inProcessField).append(") like '").append(
+ sLike).append("%'");
+
+ if (!Util.isNullString(_orderBy))
+ sb.append(" order by ").append(_orderBy);
+ return sb.toString();
+ } // ________________________________
+
+ /**
+ * Assemble the SQL statement to update the field in the
+ * "inProcessField" parameter
+ *
+ * in the table row uniquely identified by the list of fields in the
+ * "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String updateStatement ()
+ {
+ StringBuilder sb = new StringBuilder().append("update ").append(
+ _tableName).append(" set ").append(_inProcessField).append(
+ " = ? where ").append(_inProcessField).append(" = ?");
+ for (String sCurr : _keys)
{
- try
- {
- int iParm = 1;
- for (String sColName : _keys)
- {
- final String val = String.valueOf(_currentRow.get(sColName));
- _PSdeleteRow.setString(iParm++, val);
- }
+ sb.append(" and ").append(sCurr).append(" = ?");
+ }
+ return sb.toString();
+ } // ________________________________
- try
- {
- getDbConn().execUpdWait(_PSdeleteRow, 5);
- getDbConn().commit();
- return true;
- }
- catch (Exception e)
- {
- _logger.debug("Delete row has failed. Rolling back!!", e);
- }
+ /**
+ * Assemble the SQL "select for update" statement for the
+ * "inProcessField" parameter
+ *
+ * in the table row uniquely identified by the list of fields in the
+ * "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String selectForUpdStatement ()
+ {
+ StringBuilder sb = new StringBuilder().append("select ").append(
+ _inProcessField).append(" from ").append(_tableName).append(
+ " where ");
+ int iCurr = 0;
+ for (String sCurr : _keys)
+ {
+ if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
- try
- {
- getDbConn().rollback();
- }
- catch (Exception e)
- {
- _logger.debug("Unable to rollback delete row", e);
- }
- }
- catch (Exception e)
- {
- _logger.debug("Unexpected exception.", e);
- }
- return false;
- } // ________________________________
+ /*
+ * HS QL does not support FOR UPDATE! All tables appear to be inherently
+ * updatable!
+ */
- protected String getStatus (ROW_STATE p_oState)
+ if (_driver.contains("hsqldb"))
+ return sb.toString();
+ else
+ return sb.append(" for update").toString();
+ } // ________________________________
+
+ /**
+ * Assemble the SQL statement to delete the current row in the table row
+ * uniquely identified by the list of fields in the "keyFields"
+ * parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String deleteStatement ()
+ {
+ StringBuilder sb = new StringBuilder().append("delete from ").append(
+ _tableName).append(" where ");
+ int iCurr = 0;
+ for (String sCurr : _keys)
{
- int iPos = p_oState.ordinal();
- return _inProcessVals.substring(iPos, ++iPos);
- } // ________________________________
+ if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
+ return sb.toString();
+ } // ________________________________
- protected boolean changeStatusToWorking ()
+ /**
+ * Try to delete 'current row' from polled table
+ *
+ * @return true if row deletion was successful - false otherwise
+ */
+ protected boolean deleteCurrentRow ()
+ {
+ try
{
- return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
- } // ________________________________
+ int iParm = 1;
+ for (String sColName : _keys)
+ {
+ final String val = String.valueOf(_currentRow.get(sColName));
+ _PSdeleteRow.setString(iParm++, val);
+ }
- protected boolean changeStatusToDone ()
+ try
+ {
+ getDbConn().execUpdWait(_PSdeleteRow, 5);
+ getDbConn().commit();
+ return true;
+ }
+ catch (Exception e)
+ {
+ _logger.debug("Delete row has failed. Rolling back!!", e);
+ }
+
+ try
+ {
+ getDbConn().rollback();
+ }
+ catch (Exception e)
+ {
+ _logger.debug("Unable to rollback delete row", e);
+ }
+ }
+ catch (Exception e)
{
- return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
- } // ________________________________
+ _logger.debug("Unexpected exception.", e);
+ }
+ return false;
+ } // ________________________________
- protected boolean changeStatusToError ()
+ protected String getStatus (ROW_STATE p_oState)
+ {
+ int iPos = p_oState.ordinal();
+ return _inProcessVals.substring(iPos, ++iPos);
+ } // ________________________________
+
+ protected boolean changeStatusToWorking ()
+ {
+ return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+ } // ________________________________
+
+ protected boolean changeStatusToDone ()
+ {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+ } // ________________________________
+
+ protected boolean changeStatusToError ()
+ {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+ } // ________________________________
+
+ protected boolean changeStatus (ROW_STATE fromState, ROW_STATE toState)
+ {
+ try
{
- return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
- } // ________________________________
+ getDbConn();
+ }
+ catch (Exception e)
+ {
+ _logger.debug("Unable to get DB connection.", e);
+ throw new IllegalStateException("Unable to get DB connection.", e);
+ }
- protected boolean changeStatus (ROW_STATE fromState, ROW_STATE toState)
+ try
{
- try
- {
- getDbConn();
- }
- catch (Exception e)
- {
- _logger.debug("Unable to get DB connection.", e);
- throw new IllegalStateException("Unable to get DB connection.", e);
- }
+ int iParm = 3;
+ for (String sColName : _keys)
+ {
+ Object oVal = String.valueOf(_currentRow.get(sColName));
+ _PSupdate.setObject(iParm++, oVal);
+ }
- try
+ try
+ {
+ _PSupdate.setString(1, getStatus(toState));
+ _PSupdate.setString(2, getStatus(fromState));
+ final int count = getDbConn().execUpdWait(_PSupdate, 5);
+ if (count == 1)
{
- int iParm = 3;
- for (String sColName : _keys)
- {
- Object oVal = String.valueOf(_currentRow.get(sColName));
- _PSupdate.setObject(iParm++, oVal);
- }
+ getDbConn().commit();
- try
- {
- _PSupdate.setString(1, getStatus(toState));
- _PSupdate.setString(2, getStatus(fromState));
- final int count = getDbConn().execUpdWait(_PSupdate, 5);
- if (count == 1)
- {
- getDbConn().commit();
+ if (_logger.isDebugEnabled())
+ _logger.debug("Successfully changed row state from "
+ + fromState + " to " + toState + ".");
- if (_logger.isDebugEnabled())
- _logger.debug("Successfully changed row state from " + fromState + " to " + toState + ".");
-
- return true;
- }
- else
- {
- _logger.warn("Cannot change row state from " + fromState + " to " + toState + ". Number of rows in state " + fromState + " = " + count);
- return false;
- }
- }
- catch (Exception e)
- {
- final String message = "Row status change to " + toState + " has failed. Rolling back!!" ;
- _logger.error(message);
- _logger.debug(message, e);
- }
-
- try
- {
- getDbConn().rollback();
- }
- catch (Exception e)
- {
- final String message = "Unable to rollback row status change to " + fromState ;
- _logger.error(message) ;
- _logger.debug(message, e) ;
- }
+ return true;
}
- catch (Exception e)
+ else
{
- final String message = "Unexpected exception." ;
- _logger.error(message) ;
- _logger.debug(message, e) ;
+ _logger.warn("Cannot change row state from " + fromState
+ + " to " + toState + ". Number of rows in state "
+ + fromState + " = " + count);
+ return false;
}
+ }
+ catch (Exception e)
+ {
+ final String message = "Row status change to " + toState
+ + " has failed. Rolling back!!";
+ _logger.error(message);
+ _logger.debug(message, e);
+ }
- return false;
- } // ________________________________
+ try
+ {
+ getDbConn().rollback();
+ }
+ catch (Exception e)
+ {
+ final String message = "Unable to rollback row status change to "
+ + fromState;
+ _logger.error(message);
+ _logger.debug(message, e);
+ }
+ }
+ catch (Exception e)
+ {
+ final String message = "Unexpected exception.";
+ _logger.error(message);
+ _logger.debug(message, e);
+ }
- /**
- * Default gateway action for SQL table rows <p/>It will just drop the
- * result set contents into a Message
- *
- * @author <a
- * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
- * @since Version 4.0
- *
- */
- public static class PackageRowContents
+ return false;
+ } // ________________________________
+
+ /**
+ * Default gateway action for SQL table rows <p/>It will just drop the
+ * result set contents into a Message
+ *
+ * @author <a
+ * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+ public static class PackageRowContents
+ {
+ public Message process (Object obj)
{
- public Message process (Object obj)
- {
- if (!(obj instanceof Serializable))
- throw new IllegalArgumentException("Object must be instance of Map");
+ if (!(obj instanceof Serializable))
+ throw new IllegalArgumentException(
+ "Object must be instance of Map");
- Message message = MessageFactory.getInstance().getMessage();
- org.jboss.soa.esb.message.Properties props = message
- .getProperties();
+ Message message = MessageFactory.getInstance().getMessage();
+ org.jboss.soa.esb.message.Properties props = message
+ .getProperties();
- props.setProperty(ListenerTagNames.SQL_ROW_DATA_TAG, obj);
+ props.setProperty(ListenerTagNames.SQL_ROW_DATA_TAG, obj);
- return message;
- }
- } // ____________________________________________________
+ return message;
+ }
+ } // ____________________________________________________
- protected final static Logger _logger = Logger
- .getLogger(SqlTableGatewayListener.class);
+ protected final static Logger _logger = Logger
+ .getLogger(SqlTableGatewayListener.class);
- protected ConfigTree _config;
+ protected ConfigTree _config;
- protected long _sleepBetweenPolls; // milliseconds
+ protected long _sleepBetweenPolls; // milliseconds
- protected String _targetServiceCategory, _targetServiceName;
+ protected String _targetServiceCategory, _targetServiceName;
- protected Collection<EPR> _targetEprs;
+ protected Collection<EPR> _targetEprs;
- protected String _composerName;
+ protected String _composerName;
- protected Class _composerClass;
+ protected Class _composerClass;
- protected Object _composer;
+ protected Object _composer;
- protected Method _processMethod;
+ protected Method _processMethod;
- protected Courier _courier;
+ protected Courier _courier;
- protected String _driver, _url, _user, _password;
+ protected String _driver, _url, _user, _password;
- protected String _tableName, _selectFields, _keyFields;
+ protected String _tableName, _selectFields, _keyFields;
- protected String _where, _orderBy;
+ protected String _where, _orderBy;
- protected String _inProcessField, _inProcessVals;
+ protected String _inProcessField, _inProcessVals;
- protected boolean _deleteAfterOK;
+ protected boolean _deleteAfterOK;
- protected String[] _columns, _keys;
+ protected String[] _columns, _keys;
- protected PreparedStatement _PSscan, _PSupdate, _PSdeleteRow;
+ protected PreparedStatement _PSscan, _PSupdate, _PSdeleteRow;
- protected JdbcCleanConn _dbConn;
+ protected JdbcCleanConn _dbConn;
- protected Map<String, Object> _currentRow;
+ protected Map<String, Object> _currentRow;
- public static enum ROW_STATE
- {
- Pending, Working, Error, Done
- }
+ public static enum ROW_STATE
+ {
+ Pending, Working, Error, Done
+ }
- public static final String DEFAULT_IN_PROCESS_STATES = "PWED";
+ public static final String DEFAULT_IN_PROCESS_STATES = "PWED";
} // ____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/FileCourierUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/FileCourierUnitTest.java 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/FileCourierUnitTest.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -26,6 +26,9 @@
import java.io.FileFilter;
import java.io.PrintStream;
import java.net.URI;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.TreeMap;
import java.util.UUID;
import junit.framework.Assert;
@@ -36,6 +39,7 @@
import org.jboss.soa.esb.addressing.eprs.FileEpr;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.common.tests.propertymanager.PropertyManagerUnitTest;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.couriers.TwoWayCourier;
@@ -45,6 +49,8 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
/**
* Tests for internal FileCourier class
* @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
@@ -67,6 +73,9 @@
{
_logger.debug("tmp directory = <"+_tmpDir+">");
purgeStaleFiles();
+
+ PropertyManager pm = ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE);
+ pm.setProperty("org.jboss.soa.esb.filter.0", "org.jboss.internal.soa.esb.message.filter.MetaDataFilter");
}
@AfterClass
@@ -103,8 +112,6 @@
@Test
public void testMessageAttributes() throws Exception
{
- ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE).setProperty("org.jboss.soa.esb.courier.filter.0", "org.jboss.internal.soa.esb.message.metadata.MetaDataFilter");
-
String contents = "This is the text that travels in the Message body";
// toEpr for files must be a directory
Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GatewayFilterUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GatewayFilterUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GatewayFilterUnitTest.java 2007-07-07 20:46:11 UTC (rev 13221)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.Call;
+import org.jboss.soa.esb.addressing.eprs.FileEpr;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.common.tests.propertymanager.PropertyManagerUnitTest;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.filter.FilterManager;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+/**
+ * Tests for internal FileCourier class
+ *
+ * @author <a
+ * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+public class GatewayFilterUnitTest
+{
+ @BeforeClass
+ public static void setUp () throws Exception
+ {
+ PropertyManager pm = ModulePropertyManager
+ .getPropertyManager(ModulePropertyManager.FILTER_MODULE);
+ pm.setProperty("org.jboss.soa.esb.filter.0",
+ "org.jboss.internal.soa.esb.message.filter.MetaDataFilter");
+ pm.setProperty("org.jboss.soa.esb.filter.1",
+ "org.jboss.internal.soa.esb.message.filter.GatewayFilter");
+ }
+
+ public static junit.framework.Test suite ()
+ {
+ return new JUnit4TestAdapter(GatewayFilterUnitTest.class);
+ }
+
+ @Test
+ public void testMessageAttributes () throws Exception
+ {
+ ConfigTree config = new ConfigTree("test");
+ Message msg = MessageFactory.getInstance().getMessage();
+ msg.getBody().setByteArray("Hello World".getBytes());
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(Environment.ORIGINAL_FILE, new File("foobar"));
+ params.put(Environment.GATEWAY_CONFIG, config);
+
+ Message output = FilterManager.getInstance().doOutputWork(msg, params);
+
+ Assert.assertNotNull(output);
+ Assert.assertEquals(output.getProperties().getProperty(
+ Environment.ORIGINAL_FILE_NAME_MSG_PROP), "foobar");
+ Assert.assertNull(output.getProperties().getProperty(
+ Environment.ORIGINAL_QUEUE_NAME_MSG_PROP));
+ }
+}
Modified: labs/jbossesb/trunk/product/services/jbossesb/src/test/resources/jbossesb-unittest-properties.xml
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/test/resources/jbossesb-unittest-properties.xml 2007-07-07 20:42:33 UTC (rev 13220)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/test/resources/jbossesb-unittest-properties.xml 2007-07-07 20:46:11 UTC (rev 13221)
@@ -83,6 +83,7 @@
</properties>
<properties name="filters">
- <property name="org.jboss.soa.esb.courier.filter.1" value="org.jboss.internal.soa.esb.message.metadata.MetaDataFilter"/>
+ <property name="org.jboss.soa.esb.filter.1" value="org.jboss.internal.soa.esb.message.filter.MetaDataFilter"/>
+ <property name="org.jboss.soa.esb.filter.2" value="org.jboss.internal.soa.esb.message.filter.GatewayFilter"/>
</properties>
</esb>
More information about the jboss-svn-commits
mailing list