[jboss-svn-commits] JBL Code SVN: r13354 - labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Jul 11 07:36:13 EDT 2007
Author: tfennelly
Date: 2007-07-11 07:36:12 -0400 (Wed, 11 Jul 2007)
New Revision: 13354
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
Log:
Just formatting so I can read it before I try to refactor for JBESB-575. Man this is scary stuff :-(
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java 2007-07-11 10:55:44 UTC (rev 13353)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/AbstractFileGateway.java 2007-07-11 11:36:12 UTC (rev 13354)
@@ -64,566 +64,487 @@
* Base class for all file gateways: local filesystem, ftp, sftp and ftps.
* <p/>Implementations for file manipulation (getFileList, getFileContents,
* renameFile and deleteFile) must be provided by factory
- *
+ *
* @author <a
* href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
* @since Version 4.0
- *
*/
public abstract class AbstractFileGateway extends
- AbstractThreadedManagedLifecycle
-{
- abstract File[] getFileList () throws GatewayException;
+ AbstractThreadedManagedLifecycle {
+ abstract File[] getFileList() throws GatewayException;
- abstract byte[] getFileContents (File file) throws GatewayException;
+ abstract byte[] getFileContents(File file) throws GatewayException;
- abstract boolean renameFile (File from, File to) throws GatewayException;
+ abstract boolean renameFile(File from, File to) throws GatewayException;
- abstract boolean deleteFile (File file) throws GatewayException;
+ abstract boolean deleteFile(File file) throws GatewayException;
- abstract void seeIfOkToWorkOnDir (File p_oDir) throws GatewayException;
+ abstract void seeIfOkToWorkOnDir(File p_oDir) throws GatewayException;
- abstract void getDefaultComposer () throws GatewayException;
+ abstract void getDefaultComposer() throws GatewayException;
- abstract void bytesToFile (byte[] bytes, File file) throws GatewayException;
+ abstract void bytesToFile(byte[] bytes, File file) throws GatewayException;
protected AbstractFileGateway(ConfigTree config)
- throws ConfigurationException, RegistryException, GatewayException
- {
- super(config);
- _config = config;
- _sleepBetweenPolls = 10000; // milliseconds
- checkMyParms();
+ throws ConfigurationException, RegistryException, GatewayException {
+ super(config);
+ _config = config;
+ _sleepBetweenPolls = 10000; // milliseconds
+ checkMyParms();
} // __________________________________
/**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException
- * for errors while initialisation.
- */
- protected void doInitialise () throws ManagedLifecycleException
- {
- try
- {
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
- _targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <"
- + _targetServiceName + "> not found in registry");
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException(
- "Unexpected registry exception", re);
- }
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise() throws ManagedLifecycleException {
+ try {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+ _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <"
+ + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re) {
+ throw new ManagedLifecycleException(
+ "Unexpected registry exception", re);
+ }
}
/**
- * Execute on the thread.
- */
- protected void doRun ()
- {
+ * Execute on the thread.
+ */
+ protected void doRun() {
- EPR replyEpr = null;
- Message replyMsg = null;
+ EPR replyEpr = null;
+ Message replyMsg = null;
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " started on thread " + Thread.currentThread().getName());
- }
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
- do
- {
- File[] fileList;
- try
- {
- fileList = getFileList();
- }
- catch (GatewayException e)
- {
- _logger.error("Can't retrieve file list", e);
- continue;
- }
+ do {
+ File[] fileList;
+ try {
+ fileList = getFileList();
+ }
+ catch (GatewayException e) {
+ _logger.error("Can't retrieve file list", e);
+ continue;
+ }
- for (File fileIn : fileList)
- {
- // Try to rename - if unsuccessful, somebody else got it first
- File fileWork = getWorkFileName(fileIn, _workingSuffix);
- try
- {
- if (!renameFile(fileIn, fileWork))
- continue;
- }
- catch (GatewayException e)
- {
- _logger.error("Problems renaming file " + fileIn + " to "
- + fileWork);
- continue;
- }
+ for (File fileIn : fileList) {
+ // Try to rename - if unsuccessful, somebody else got it first
+ File fileWork = getWorkFileName(fileIn, _workingSuffix);
+ try {
+ if (!renameFile(fileIn, fileWork))
+ continue;
+ }
+ catch (GatewayException e) {
+ _logger.error("Problems renaming file " + fileIn + " to "
+ + fileWork);
+ continue;
+ }
- Throwable thrown = null;
- String text = null;
- try
- {
- Object obj = _processMethod.invoke(_composer, new Object[]
- { fileWork });
- if (null == obj)
- {
- _logger.warn("Action class method <"
- + _processMethod.getName()
- + "> returned a null object");
- continue;
- }
- boolean bSent = false;
+ Throwable thrown = null;
+ String text = null;
+ try {
+ Object obj = _processMethod.invoke(_composer, new Object[]
+ {fileWork});
+ if (null == obj) {
+ _logger.warn("Action class method <"
+ + _processMethod.getName()
+ + "> returned a null object");
+ continue;
+ }
+ boolean bSent = false;
- Message outMessage = (Message) obj;
- Map<String, Object> params = new HashMap<String, Object>();
+ Message outMessage = (Message) obj;
+ Map<String, Object> params = new HashMap<String, Object>();
- params.put(Environment.ORIGINAL_FILE, fileIn);
- params.put(Environment.GATEWAY_CONFIG, _config);
-
- outMessage = FilterManager.getInstance().doOutputWork(outMessage, params);
-
- for (EPR current : _targetEprs)
- {
- if (current instanceof FileEpr)
- {
- try
- {
- FileEpr fpr = (FileEpr) current;
- FileEpr newEpr = new FileEpr(fpr.getURL());
- newEpr.setPostDelete(false);
- newEpr.setPostDirectory(fpr.getURL().getFile());
- newEpr.setPostSuffix(fpr.getInputSuffix());
- current = newEpr;
- }
- catch (Exception e)
- {
- _logger.error("Problems with file EPR", e);
- }
- }
- _courier = getCourier(current);
- try
- {
- replyEpr = null;
- outMessage.getHeader().getCall().setTo(current);
- if (_maxMillisForResponse > 0)
- {
- replyEpr = CourierUtil
- .getDefaultReplyToEpr(current);
- outMessage.getHeader().getCall().setReplyTo(
- replyEpr);
- }
- if (_courier.deliver(outMessage))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier);
- }
+ params.put(Environment.ORIGINAL_FILE, fileIn);
+ params.put(Environment.GATEWAY_CONFIG, _config);
- }
- if (!bSent)
- {
- text = "Target service <" + _targetServiceCategory
- + "," + _targetServiceName
- + "> is not registered";
- thrown = new Exception(text);
- }
- else if (null != replyEpr)
- {
- TwoWayCourier replyCourier = CourierFactory
- .getPickupCourier(replyEpr);
- try
- {
- replyMsg = replyCourier
- .pickup(_maxMillisForResponse);
- _responderMethod.invoke(_composer, new Object[]
- { replyMsg, fileIn });
- }
- catch (CourierTimeoutException e)
- {
- thrown = e;
- text = "Expected response was not received from invoked service";
- replyMsg = MessageFactory.getInstance()
- .getMessage();
- String timedOut = "Service <"
- + _targetServiceCategory + ","
- + "_targetServiceName"
- + "> timed out without sending response";
- replyMsg.getBody()
- .setByteArray(timedOut.getBytes());
- _responderMethod.invoke(_composer, new Object[]
- { replyMsg, fileIn });
- }
- finally
- {
- if (null != replyCourier)
- CourierUtil.cleanCourier(replyCourier);
- }
- }
- }
- catch (InvocationTargetException e)
- {
- thrown = e;
- text = "Problems invoking method <"
- + _processMethod.getName() + ">";
+ outMessage = FilterManager.getInstance().doOutputWork(outMessage, params);
- }
- catch (IllegalAccessException e)
- {
- thrown = e;
- text = "Problems invoking method <"
- + _processMethod.getName() + ">";
- }
- catch (ClassCastException e)
- {
- thrown = e;
- text = "Action class method <" + _processMethod.getName()
- + "> returned a non Message object";
- }
- catch (CourierException e)
- {
- thrown = e;
- if (null != _courier)
- text = "Courier <" + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED";
- else
- text = "NULL courier can't deliverAsync Message";
- }
- catch (MalformedEPRException e)
- {
- thrown = e;
- if (null != _courier)
- text = "Courier <"
- + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED with malformed EPR.";
- else
- text = "NULL courier can't deliverAsync Message";
- }
+ for (EPR current : _targetEprs) {
+ if (current instanceof FileEpr) {
+ try {
+ FileEpr fpr = (FileEpr) current;
+ FileEpr newEpr = new FileEpr(fpr.getURL());
+ newEpr.setPostDelete(false);
+ newEpr.setPostDirectory(fpr.getURL().getFile());
+ newEpr.setPostSuffix(fpr.getInputSuffix());
+ current = newEpr;
+ }
+ catch (Exception e) {
+ _logger.error("Problems with file EPR", e);
+ }
+ }
+ _courier = getCourier(current);
+ try {
+ replyEpr = null;
+ outMessage.getHeader().getCall().setTo(current);
+ if (_maxMillisForResponse > 0) {
+ replyEpr = CourierUtil
+ .getDefaultReplyToEpr(current);
+ outMessage.getHeader().getCall().setReplyTo(
+ replyEpr);
+ }
+ if (_courier.deliver(outMessage)) {
+ bSent = true;
+ break;
+ }
+ }
+ finally {
+ CourierUtil.cleanCourier(_courier);
+ }
- if (null == thrown)
- {
- File fileOK = new File(_postProcessDirectory, fileIn
- .getName()
- + _postProcessSuffix);
- if (_deleteAfterOK)
- {
- try
- {
- deleteFile(fileWork);
- }
- catch (GatewayException e)
- {
- _logger
- .error(
- "File "
- + fileIn
- + " has been processed and renamed to "
- + fileWork
- + ", but there were problems deleting it from the input directory ",
- e);
- }
- }
- else
- {
- try
- {
- renameFile(fileWork, fileOK);
- }
- catch (GatewayException e)
- {
- _logger
- .error(
- "File "
- + fileIn
- + " has been processed and renamed to "
- + fileWork
- + ", but there were problems renaming it to "
- + fileOK, e);
- }
- }
- }
- else
- {
- thrown.printStackTrace();
- _logger.error(text, thrown);
- File fileError = new File(_errorDirectory, fileIn.getName()
- + _errorSuffix);
- try
- {
- deleteFile(fileError);
- }
- catch (GatewayException e)
- {
- _logger.warn("File : " + fileError + " did not exist.");
- }
- try
- {
- renameFile(fileWork, fileError);
- }
- catch (GatewayException e)
- {
- _logger.error("Problems renaming file " + fileWork
- + " to " + fileError, e);
- }
- }
- }
- }
- while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
- _sleepBetweenPolls));
+ }
+ if (!bSent) {
+ text = "Target service <" + _targetServiceCategory
+ + "," + _targetServiceName
+ + "> is not registered";
+ thrown = new Exception(text);
+ } else if (null != replyEpr) {
+ TwoWayCourier replyCourier = CourierFactory
+ .getPickupCourier(replyEpr);
+ try {
+ replyMsg = replyCourier
+ .pickup(_maxMillisForResponse);
+ _responderMethod.invoke(_composer, new Object[]
+ {replyMsg, fileIn});
+ }
+ catch (CourierTimeoutException e) {
+ thrown = e;
+ text = "Expected response was not received from invoked service";
+ replyMsg = MessageFactory.getInstance()
+ .getMessage();
+ String timedOut = "Service <"
+ + _targetServiceCategory + ","
+ + "_targetServiceName"
+ + "> timed out without sending response";
+ replyMsg.getBody()
+ .setByteArray(timedOut.getBytes());
+ _responderMethod.invoke(_composer, new Object[]
+ {replyMsg, fileIn});
+ }
+ finally {
+ if (null != replyCourier)
+ CourierUtil.cleanCourier(replyCourier);
+ }
+ }
+ }
+ catch (InvocationTargetException e) {
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
- if (_logger.isDebugEnabled())
- {
- _logger
- .debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread "
- + Thread.currentThread().getName());
- }
+ }
+ catch (IllegalAccessException e) {
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
+ }
+ catch (ClassCastException e) {
+ thrown = e;
+ text = "Action class method <" + _processMethod.getName()
+ + "> returned a non Message object";
+ }
+ catch (CourierException e) {
+ thrown = e;
+ if (null != _courier)
+ text = "Courier <" + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED";
+ else
+ text = "NULL courier can't deliverAsync Message";
+ }
+ catch (MalformedEPRException e) {
+ thrown = e;
+ if (null != _courier)
+ text = "Courier <"
+ + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED with malformed EPR.";
+ else
+ text = "NULL courier can't deliverAsync Message";
+ }
+
+ if (null == thrown) {
+ File fileOK = new File(_postProcessDirectory, fileIn
+ .getName()
+ + _postProcessSuffix);
+ if (_deleteAfterOK) {
+ try {
+ deleteFile(fileWork);
+ }
+ catch (GatewayException e) {
+ _logger
+ .error(
+ "File "
+ + fileIn
+ + " has been processed and renamed to "
+ + fileWork
+ + ", but there were problems deleting it from the input directory ",
+ e);
+ }
+ } else {
+ try {
+ renameFile(fileWork, fileOK);
+ }
+ catch (GatewayException e) {
+ _logger
+ .error(
+ "File "
+ + fileIn
+ + " has been processed and renamed to "
+ + fileWork
+ + ", but there were problems renaming it to "
+ + fileOK, e);
+ }
+ }
+ } else {
+ thrown.printStackTrace();
+ _logger.error(text, thrown);
+ File fileError = new File(_errorDirectory, fileIn.getName()
+ + _errorSuffix);
+ try {
+ deleteFile(fileError);
+ }
+ catch (GatewayException e) {
+ _logger.warn("File : " + fileError + " did not exist.");
+ }
+ try {
+ renameFile(fileWork, fileError);
+ }
+ catch (GatewayException e) {
+ _logger.error("Problems renaming file " + fileWork
+ + " to " + fileError, e);
+ }
+ }
+ }
+ }
+ while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
+ _sleepBetweenPolls));
+
+ if (_logger.isDebugEnabled()) {
+ _logger
+ .debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread "
+ + Thread.currentThread().getName());
+ }
} // ________________________________
- protected File getWorkFileName (File fileIn, String suffix)
- {
- return new File(fileIn.toString() + _workingSuffix);
+ protected File getWorkFileName(File fileIn, String suffix) {
+ return new File(fileIn.toString() + _workingSuffix);
}
/*
* Extracted to simplify testing
*/
- protected Courier getCourier (EPR current) throws CourierException,
- MalformedEPRException
- {
- return CourierFactory.getCourier(current);
+ protected Courier getCourier(EPR current) throws CourierException,
+ MalformedEPRException {
+ return CourierFactory.getCourier(current);
}
/**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException
- * for errors while destroying.
- */
- protected void doDestroy () throws ManagedLifecycleException
- {
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy() throws ManagedLifecycleException {
}
/*
* Is the input suffix valid for this type of gateway?
*/
- protected void checkInputSuffix () throws ConfigurationException
- {
- if (_inputSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_INPUT_SFX_TAG + " attribute");
+ protected void checkInputSuffix() throws ConfigurationException {
+ if (_inputSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_INPUT_SFX_TAG + " attribute");
}
/**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- private void checkMyParms () throws ConfigurationException,
- RegistryException, GatewayException
- {
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ private void checkMyParms() throws ConfigurationException,
+ RegistryException, GatewayException {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- // Polling interval
- String sAux = _config
- .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ // Polling interval
+ String sAux = _config
+ .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- if (!Util.isNullString(sAux))
- {
- try
- {
- _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid poll latency - keeping default of "
- + (_sleepBetweenPolls / 1000));
- }
- }
- else
- {
- _logger.warn("No value specified for: "
- + ListenerTagNames.POLL_LATENCY_SECS_TAG
- + " - Using default of " + (_sleepBetweenPolls / 1000));
- }
+ if (!Util.isNullString(sAux)) {
+ try {
+ _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+ }
+ catch (NumberFormatException e) {
+ _logger.warn("Invalid poll latency - keeping default of "
+ + (_sleepBetweenPolls / 1000));
+ }
+ } else {
+ _logger.warn("No value specified for: "
+ + ListenerTagNames.POLL_LATENCY_SECS_TAG
+ + " - Using default of " + (_sleepBetweenPolls / 1000));
+ }
- resolveComposerClass();
+ resolveComposerClass();
- boolean hasResponder = _responderMethod != null;
- _maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait(_config,
- _logger, hasResponder);
- try
- {
- // INPUT directory and suffix (used for FileFilter)
- String url = _config.getAttribute(ListenerTagNames.URL_TAG);
- String sInpDir = (null != url) ? new URL(url).getFile()
- : ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_INPUT_DIR_TAG, null);
- _inputDirectory = fileFromString(sInpDir);
- seeIfOkToWorkOnDir(_inputDirectory);
+ boolean hasResponder = _responderMethod != null;
+ _maxMillisForResponse = ListenerUtil.getMaxMillisGatewayWait(_config,
+ _logger, hasResponder);
+ try {
+ // INPUT directory and suffix (used for FileFilter)
+ String url = _config.getAttribute(ListenerTagNames.URL_TAG);
+ String sInpDir = (null != url) ? new URL(url).getFile()
+ : ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_INPUT_DIR_TAG, null);
+ _inputDirectory = fileFromString(sInpDir);
+ seeIfOkToWorkOnDir(_inputDirectory);
- _inputSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_INPUT_SFX_TAG, null);
- _inputSuffix = _inputSuffix.trim();
+ _inputSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_INPUT_SFX_TAG, null);
+ _inputSuffix = _inputSuffix.trim();
- checkInputSuffix();
+ checkInputSuffix();
- // WORK suffix (will rename in input directory)
- _workingSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_WORK_SFX_TAG, ".esbWork").trim();
- if (_workingSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_WORK_SFX_TAG + " attribute");
+ // WORK suffix (will rename in input directory)
+ _workingSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_WORK_SFX_TAG, ".esbWork").trim();
+ if (_workingSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_WORK_SFX_TAG + " attribute");
- if (_inputSuffix.equals(_workingSuffix))
- throw new ConfigurationException(
- "Work suffix must differ from input suffix <"
- + _workingSuffix + ">");
+ if (_inputSuffix.equals(_workingSuffix))
+ throw new ConfigurationException(
+ "Work suffix must differ from input suffix <"
+ + _workingSuffix + ">");
- // ERROR directory and suffix (defaults to input dir and
- // ".esbError"
- // suffix)
- String sErrDir = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_ERROR_DIR_TAG, sInpDir);
- _errorDirectory = fileFromString(sErrDir);
- seeIfOkToWorkOnDir(_errorDirectory);
+ // ERROR directory and suffix (defaults to input dir and
+ // ".esbError"
+ // suffix)
+ String sErrDir = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_ERROR_DIR_TAG, sInpDir);
+ _errorDirectory = fileFromString(sErrDir);
+ seeIfOkToWorkOnDir(_errorDirectory);
- _errorSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_ERROR_SFX_TAG, ".esbError").trim();
- if (_errorSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_ERROR_SFX_TAG + " attribute");
- if (_errorDirectory.equals(_inputDirectory)
- && _inputSuffix.equals(_errorSuffix))
- throw new ConfigurationException(
- "Error suffix must differ from input suffix <"
- + _errorSuffix + ">");
+ _errorSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_ERROR_SFX_TAG, ".esbError").trim();
+ if (_errorSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_ERROR_SFX_TAG + " attribute");
+ if (_errorDirectory.equals(_inputDirectory)
+ && _inputSuffix.equals(_errorSuffix))
+ throw new ConfigurationException(
+ "Error suffix must differ from input suffix <"
+ + _errorSuffix + ">");
- // Do users wish to delete files that were processed OK ?
- String sPostDel = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_POST_DEL_TAG, "false").trim();
- _deleteAfterOK = Boolean.parseBoolean(sPostDel);
- if (_deleteAfterOK)
- return;
+ // Do users wish to delete files that were processed OK ?
+ String sPostDel = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_POST_DEL_TAG, "false").trim();
+ _deleteAfterOK = Boolean.parseBoolean(sPostDel);
+ if (_deleteAfterOK)
+ return;
- // POST (done) directory and suffix (defaults to input dir and
- // ".esbDone" suffix)
- String sPostDir = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_POST_DIR_TAG, sInpDir);
- _postProcessDirectory = fileFromString(sPostDir);
- seeIfOkToWorkOnDir(_postProcessDirectory);
- _postProcessSuffix = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.FILE_POST_SFX_TAG, ".esbDone").trim();
+ // POST (done) directory and suffix (defaults to input dir and
+ // ".esbDone" suffix)
+ String sPostDir = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_POST_DIR_TAG, sInpDir);
+ _postProcessDirectory = fileFromString(sPostDir);
+ seeIfOkToWorkOnDir(_postProcessDirectory);
+ _postProcessSuffix = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.FILE_POST_SFX_TAG, ".esbDone").trim();
- if (_postProcessDirectory.equals(_inputDirectory))
- {
- if (_postProcessSuffix.length() < 1)
- throw new ConfigurationException("Invalid "
- + ListenerTagNames.FILE_POST_SFX_TAG + " attribute");
- if (_postProcessSuffix.equals(_inputSuffix))
- throw new ConfigurationException(
- "Post process suffix must differ from input suffix <"
- + _postProcessSuffix + ">");
- }
- }
- catch (GatewayException ex)
- {
- throw ex;
- }
- catch (MalformedURLException ex)
- {
- throw new ConfigurationException(ex);
- }
+ if (_postProcessDirectory.equals(_inputDirectory)) {
+ if (_postProcessSuffix.length() < 1)
+ throw new ConfigurationException("Invalid "
+ + ListenerTagNames.FILE_POST_SFX_TAG + " attribute");
+ if (_postProcessSuffix.equals(_inputSuffix))
+ throw new ConfigurationException(
+ "Post process suffix must differ from input suffix <"
+ + _postProcessSuffix + ">");
+ }
+ }
+ catch (GatewayException ex) {
+ throw ex;
+ }
+ catch (MalformedURLException ex) {
+ throw new ConfigurationException(ex);
+ }
} // ________________________________
- protected void resolveComposerClass () throws ConfigurationException,
- GatewayException
- {
- String sProcessMethod = null;
- String sResponderMethod = null;
- try
- {
- _composerName = _config
- .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config
- .getAttribute(
- ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
- "process");
- sResponderMethod = _config
- .getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
- }
- else
- {
- getDefaultComposer();
- sProcessMethod = "process";
- sResponderMethod = "respond";
- }
+ protected void resolveComposerClass() throws ConfigurationException,
+ GatewayException {
+ String sProcessMethod = null;
+ String sResponderMethod = null;
+ try {
+ _composerName = _config
+ .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName) { // class attribute
+ _composerClass = ClassUtil.forName(_composerName, getClass());
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ {ConfigTree.class});
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config
+ .getAttribute(
+ ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+ "process");
+ sResponderMethod = _config
+ .getAttribute(ListenerTagNames.GATEWAY_RESPONDER_METHOD_TAG);
+ } else {
+ getDefaultComposer();
+ sProcessMethod = "process";
+ sResponderMethod = "respond";
+ }
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[]
- { Object.class });
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]
+ {Object.class});
- _responderMethod = (null == sResponderMethod) ? null
- : _composerClass.getMethod(sResponderMethod, new Class[]
- { Message.class, File.class });
- }
- catch (InvocationTargetException ex)
- {
- throw new ConfigurationException(ex);
- }
- catch (IllegalAccessException ex)
- {
- throw new ConfigurationException(ex);
- }
- catch (InstantiationException ex)
- {
- throw new ConfigurationException(ex);
- }
- catch (NoSuchMethodException ex)
- {
- throw new ConfigurationException(ex);
- }
- catch (ClassNotFoundException ex)
- {
- throw new ConfigurationException(ex);
- }
+ _responderMethod = (null == sResponderMethod) ? null
+ : _composerClass.getMethod(sResponderMethod, new Class[]
+ {Message.class, File.class});
+ }
+ catch (InvocationTargetException ex) {
+ throw new ConfigurationException(ex);
+ }
+ catch (IllegalAccessException ex) {
+ throw new ConfigurationException(ex);
+ }
+ catch (InstantiationException ex) {
+ throw new ConfigurationException(ex);
+ }
+ catch (NoSuchMethodException ex) {
+ throw new ConfigurationException(ex);
+ }
+ catch (ClassNotFoundException ex) {
+ throw new ConfigurationException(ex);
+ }
} // ________________________________
- private File fileFromString (String file)
- {
- try
- {
- return new File(new URI(file));
- }
- catch (Exception e)
- {
- return new File(file);
- }
+ private File fileFromString(String file) {
+ try {
+ return new File(new URI(file));
+ }
+ catch (Exception e) {
+ return new File(file);
+ }
} // ________________________________
protected final static Logger _logger = Logger
- .getLogger(AbstractFileGateway.class);
+ .getLogger(AbstractFileGateway.class);
protected ConfigTree _config;
@@ -652,7 +573,7 @@
protected File _inputDirectory, _errorDirectory, _postProcessDirectory;
protected String _inputSuffix, _postProcessSuffix, _workingSuffix,
- _errorSuffix;
+ _errorSuffix;
protected FileFilter _fileFilter;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-07-11 10:55:44 UTC (rev 13353)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-07-11 11:36:12 UTC (rev 13354)
@@ -63,418 +63,355 @@
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.ClassUtil;
-public class JmsGatewayListener extends AbstractThreadedManagedLifecycle
-{
+public class JmsGatewayListener extends AbstractThreadedManagedLifecycle {
/**
- * serial version uid for this class
- */
+ * serial version uid for this class
+ */
private static final long serialVersionUID = 5070422864110923930L;
public JmsGatewayListener(ConfigTree listenerConfig)
- throws ConfigurationException
- {
- super(listenerConfig);
- _config = listenerConfig;
- checkMyParms();
+ throws ConfigurationException {
+ super(listenerConfig);
+ _config = listenerConfig;
+ checkMyParms();
} // __________________________________
/**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException
- * for errors while initialisation.
- */
- protected void doInitialise () throws ManagedLifecycleException
- {
- try
- {
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
- _targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <"
- + _targetServiceName + "> not found in registry");
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException(
- "Unexpected registry exception", re);
- }
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise() throws ManagedLifecycleException {
+ try {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+ _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <"
+ + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re) {
+ throw new ManagedLifecycleException(
+ "Unexpected registry exception", re);
+ }
- try
- {
- prepareMessageReceiver();
- }
- catch (final ConnectionException ce)
- {
- throw new ManagedLifecycleException(
- "Unexpected connection exception from prepareMessageReceiver",
- ce);
- }
- catch (final JMSException jmse)
- {
- throw new ManagedLifecycleException(
- "Unexpected JMS error from prepareMessageReceiver", jmse);
- }
- catch (final ConfigurationException ce)
- {
- throw new ManagedLifecycleException(
- "Unexpected configuration exception from prepareMessageReceiver",
- ce);
- }
+ try {
+ prepareMessageReceiver();
+ }
+ catch (final ConnectionException ce) {
+ throw new ManagedLifecycleException(
+ "Unexpected connection exception from prepareMessageReceiver",
+ ce);
+ }
+ catch (final JMSException jmse) {
+ throw new ManagedLifecycleException(
+ "Unexpected JMS error from prepareMessageReceiver", jmse);
+ }
+ catch (final ConfigurationException ce) {
+ throw new ManagedLifecycleException(
+ "Unexpected configuration exception from prepareMessageReceiver",
+ ce);
+ }
- if (_serviceName != null)
- {
- try
- {
- RegistryUtil.register(_config, _myEpr);
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException(
- "Unexpected error during registration for epr "
- + _myEpr, re);
- }
- }
+ if (_serviceName != null) {
+ try {
+ RegistryUtil.register(_config, _myEpr);
+ }
+ catch (final RegistryException re) {
+ throw new ManagedLifecycleException(
+ "Unexpected error during registration for epr "
+ + _myEpr, re);
+ }
+ }
}
/**
- * Execute on the thread.
- */
- protected void doRun ()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " started on thread " + Thread.currentThread().getName());
- }
+ * Execute on the thread.
+ */
+ protected void doRun() {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " started on thread " + Thread.currentThread().getName());
+ }
- while (isRunning())
- {
- javax.jms.Message msgIn = receiveOne();
- if (null != msgIn)
- {
- try
- {
- Object obj = _processMethod.invoke(_composer, new Object[]
- { msgIn });
- if (null == obj)
- {
- _logger.warn("Action class method <"
- + _processMethod.getName()
- + "> returned a null object");
- continue;
- }
- // try to deliverAsync the composed message, using the
- // appropriate courier
- // to the target service
-
- Map<String, Object> params = new HashMap<String, Object>();
-
- params.put(Environment.GATEWAY_CONFIG, _config);
-
- obj = FilterManager.getInstance().doOutputWork((Message) obj, params);
-
- try
- {
- boolean bSent = false;
- for (EPR current : _targetEprs)
- {
- _courier = CourierFactory.getCourier(current);
- try
- {
- if (_courier.deliver((Message) obj))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier);
- }
- }
- if (!bSent)
- {
- String text = "Target service <"
- + _targetServiceCategory + ","
- + _targetServiceName
- + "> is not registered";
- throw new Exception(text);
- }
- }
- catch (ClassCastException e)
- {
- _logger.error("Action class method <"
- + _processMethod.getName()
- + "> returned a non Message object", e);
- continue;
- }
- catch (CourierException e)
- {
- String text = (null != _courier) ? "Courier <"
- + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED"
- : "NULL courier can't deliverAsync Message";
- _logger.error(text, e);
- continue;
- }
- continue;
- }
- catch (InvocationTargetException e)
- {
- _logger.error("Problems invoking method <"
- + _processMethod.getName() + ">", e);
- }
- catch (IllegalAccessException e)
- {
- _logger.error("Problems invoking method <"
- + _processMethod.getName() + ">", e);
- }
- catch (Exception e)
- {
- _logger.error("Unexpected problem", e);
- }
- }
- }
+ while (isRunning()) {
+ javax.jms.Message msgIn = receiveOne();
+ if (null != msgIn) {
+ try {
+ Object obj = _processMethod.invoke(_composer, new Object[]
+ {msgIn});
+ if (null == obj) {
+ _logger.warn("Action class method <"
+ + _processMethod.getName()
+ + "> returned a null object");
+ continue;
+ }
+ // try to deliverAsync the composed message, using the
+ // appropriate courier
+ // to the target service
- _logger.debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread " + Thread.currentThread().getName());
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(Environment.GATEWAY_CONFIG, _config);
+
+ obj = FilterManager.getInstance().doOutputWork((Message) obj, params);
+
+ try {
+ boolean bSent = false;
+ for (EPR current : _targetEprs) {
+ _courier = CourierFactory.getCourier(current);
+ try {
+ if (_courier.deliver((Message) obj)) {
+ bSent = true;
+ break;
+ }
+ }
+ finally {
+ CourierUtil.cleanCourier(_courier);
+ }
+ }
+ if (!bSent) {
+ String text = "Target service <"
+ + _targetServiceCategory + ","
+ + _targetServiceName
+ + "> is not registered";
+ throw new Exception(text);
+ }
+ }
+ catch (ClassCastException e) {
+ _logger.error("Action class method <"
+ + _processMethod.getName()
+ + "> returned a non Message object", e);
+ continue;
+ }
+ catch (CourierException e) {
+ String text = (null != _courier) ? "Courier <"
+ + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED"
+ : "NULL courier can't deliverAsync Message";
+ _logger.error(text, e);
+ continue;
+ }
+ continue;
+ }
+ catch (InvocationTargetException e) {
+ _logger.error("Problems invoking method <"
+ + _processMethod.getName() + ">", e);
+ }
+ catch (IllegalAccessException e) {
+ _logger.error("Problems invoking method <"
+ + _processMethod.getName() + ">", e);
+ }
+ catch (Exception e) {
+ _logger.error("Unexpected problem", e);
+ }
+ }
+ }
+
+ _logger.debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread " + Thread.currentThread().getName());
} // ________________________________
/**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException
- * for errors while destroying.
- */
- protected void doDestroy () throws ManagedLifecycleException
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
- }
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy() throws ManagedLifecycleException {
+ if (_serviceName != null) {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+ }
- if (_messageReceiver != null)
- {
- try
- {
- _messageReceiver.close();
- }
- catch (final JMSException jmse)
- {
- } // ignore
- }
+ if (_messageReceiver != null) {
+ try {
+ _messageReceiver.close();
+ }
+ catch (final JMSException jmse) {
+ } // ignore
+ }
- if (_queueSession != null)
- {
- _pool.closeSession(_queueSession);
- }
+ if (_queueSession != null) {
+ _pool.closeSession(_queueSession);
+ }
}
/**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms () throws ConfigurationException
- {
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- _queueName = ListenerUtil.obtainAtt(_config,
- JMSEpr.DESTINATION_NAME_TAG, null);
+ _queueName = ListenerUtil.obtainAtt(_config,
+ JMSEpr.DESTINATION_NAME_TAG, null);
- resolveComposerClass();
+ resolveComposerClass();
- // No problem if selector is null - everything in queue will be returned
- _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
- _logger.debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG
- + " - All messages in queue will be received by this listener");
+ // No problem if selector is null - everything in queue will be returned
+ _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+ _logger.debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG
+ + " - All messages in queue will be received by this listener");
} // ________________________________
- protected void resolveComposerClass () throws ConfigurationException
- {
- try
- {
- String sProcessMethod = null;
- _composerName = _config
- .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config
- .getAttribute(
- ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
- "process");
- }
- else
- {
- _composerName = PackageJmsMessageContents.class.getName();
- _composerClass = PackageJmsMessageContents.class;
- _composer = new PackageJmsMessageContents();
- sProcessMethod = "process";
- _logger
- .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
- + "> element found in configuration"
- + " - Using default composer class : "
- + _composerName);
- }
+ protected void resolveComposerClass() throws ConfigurationException {
+ try {
+ String sProcessMethod = null;
+ _composerName = _config
+ .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName) { // class attribute
+ _composerClass = ClassUtil.forName(_composerName, getClass());
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ {ConfigTree.class});
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config
+ .getAttribute(
+ ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+ "process");
+ } else {
+ _composerName = PackageJmsMessageContents.class.getName();
+ _composerClass = PackageJmsMessageContents.class;
+ _composer = new PackageJmsMessageContents();
+ sProcessMethod = "process";
+ _logger
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
+ + "> element found in configuration"
+ + " - Using default composer class : "
+ + _composerName);
+ }
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[]
- { Object.class });
- }
- catch (Exception ex)
- {
- throw new ConfigurationException(ex);
- }
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]
+ {Object.class});
+ }
+ catch (Exception ex) {
+ throw new ConfigurationException(ex);
+ }
} // ________________________________
- private void prepareMessageReceiver () throws ConfigurationException,
- JMSException, ConnectionException
- {
- _queueSession = null;
- _queue = null;
+ private void prepareMessageReceiver() throws ConfigurationException,
+ JMSException, ConnectionException {
+ _queueSession = null;
+ _queue = null;
- Properties environment = new Properties();
+ Properties environment = new Properties();
- String sJndiURL = _config.getAttribute(JMSEpr.JNDI_URL_TAG);
- String sJndiContextFactory = _config
- .getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
- String sJndiPkgPrefix = _config
- .getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
- if (sJndiURL != null)
- environment.setProperty(Context.PROVIDER_URL, sJndiURL);
- if (sJndiContextFactory != null)
- environment.setProperty(Context.INITIAL_CONTEXT_FACTORY,
- sJndiContextFactory);
- if (sJndiPkgPrefix != null)
- environment.setProperty(Context.URL_PKG_PREFIXES, sJndiPkgPrefix);
- Set<String> names = _config.getAttributeNames();
- for (String name : names)
- {
- if (name.startsWith("java.naming."))
- {
- environment.setProperty(name, _config.getAttribute(name));
- }
- }
- Context oJndiCtx = NamingContext.getServerContext(environment);
+ String sJndiURL = _config.getAttribute(JMSEpr.JNDI_URL_TAG);
+ String sJndiContextFactory = _config
+ .getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
+ String sJndiPkgPrefix = _config
+ .getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
+ if (sJndiURL != null)
+ environment.setProperty(Context.PROVIDER_URL, sJndiURL);
+ if (sJndiContextFactory != null)
+ environment.setProperty(Context.INITIAL_CONTEXT_FACTORY,
+ sJndiContextFactory);
+ if (sJndiPkgPrefix != null)
+ environment.setProperty(Context.URL_PKG_PREFIXES, sJndiPkgPrefix);
+ Set<String> names = _config.getAttributeNames();
+ for (String name : names) {
+ if (name.startsWith("java.naming.")) {
+ environment.setProperty(name, _config.getAttribute(name));
+ }
+ }
+ Context oJndiCtx = NamingContext.getServerContext(environment);
- if (null == oJndiCtx)
- throw new ConfigurationException("Unable fo obtain jndi context <"
- + sJndiURL + "," + sJndiContextFactory + ","
- + sJndiPkgPrefix + ">");
+ if (null == oJndiCtx)
+ throw new ConfigurationException("Unable fo obtain jndi context <"
+ + sJndiURL + "," + sJndiContextFactory + ","
+ + sJndiPkgPrefix + ">");
- String sFactClass = ListenerUtil.obtainAtt(_config,
- JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
- if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
- _logger.debug("No value specified for "
- + JMSEpr.CONNECTION_FACTORY_TAG + " attribute"
- + " - Using default of: '" + sFactClass + "'");
- _serviceCategory = _config
- .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
- _queueName, sFactClass, environment, _messageSelector);
+ String sFactClass = ListenerUtil.obtainAtt(_config,
+ JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
+ _logger.debug("No value specified for "
+ + JMSEpr.CONNECTION_FACTORY_TAG + " attribute"
+ + " - Using default of: '" + sFactClass + "'");
+ _serviceCategory = _config
+ .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
+ _queueName, sFactClass, environment, _messageSelector);
- _pool = JmsConnectionPoolContainer.getPool(environment, sFactClass,
- JMSEpr.QUEUE_TYPE);
+ _pool = JmsConnectionPoolContainer.getPool(environment, sFactClass,
+ JMSEpr.QUEUE_TYPE);
- try
- {
- _queueSession = _pool.getQueueSession();
- }
- catch (NamingException ne)
- {
- throw new ConfigurationException(
- "Failed to obtain queue session from pool", ne);
- }
+ try {
+ _queueSession = _pool.getQueueSession();
+ }
+ catch (NamingException ne) {
+ throw new ConfigurationException(
+ "Failed to obtain queue session from pool", ne);
+ }
- try
- {
- _queue = (Queue) oJndiCtx.lookup(_queueName);
- }
- catch (NamingException nex)
- {
- try
- {
- oJndiCtx = NamingContext.getServerContext(environment);
- _queue = (Queue) oJndiCtx.lookup(_queueName);
- }
- catch (NamingException ne)
- {
- _queue = _queueSession.createQueue(_queueName);
- }
- }
+ try {
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ }
+ catch (NamingException nex) {
+ try {
+ oJndiCtx = NamingContext.getServerContext(environment);
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ }
+ catch (NamingException ne) {
+ _queue = _queueSession.createQueue(_queueName);
+ }
+ }
- _messageReceiver = _queueSession.createReceiver(_queue,
- _messageSelector);
+ _messageReceiver = _queueSession.createReceiver(_queue,
+ _messageSelector);
} // ________________________________
/**
- * Receive one message and retry if connection
- *
- * @return javax.jms.Message - One input message, or null
- */
- protected javax.jms.Message receiveOne ()
- {
- while (isRunning())
- try
- {
- javax.jms.Message ret = _messageReceiver.receive(200);
- if (null != ret)
- return ret;
- }
- catch (JMSException oJ)
- {
- if (_logger.isDebugEnabled())
- {
- _logger
- .debug(
- "JMS error on receive. Attempting JMS Destination reconnect.",
- oJ);
- }
- try
- {
- prepareMessageReceiver();
- errorDelay = 0;
- }
- // try to reconnect to the queue
- catch (Exception e)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconnecting to Queue", e);
- }
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY;
- }
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1;
- }
- _logger
- .warn("Error reconnecting to Queue, backing off for "
- + errorDelay + " milliseconds");
- waitForRunningStateChange(
- ManagedLifecycleThreadState.STOPPING, errorDelay);
- }
- }
- return null;
+ * Receive one message and retry if connection
+ *
+ * @return javax.jms.Message - One input message, or null
+ */
+ protected javax.jms.Message receiveOne() {
+ while (isRunning())
+ try {
+ javax.jms.Message ret = _messageReceiver.receive(200);
+ if (null != ret)
+ return ret;
+ }
+ catch (JMSException oJ) {
+ if (_logger.isDebugEnabled()) {
+ _logger
+ .debug(
+ "JMS error on receive. Attempting JMS Destination reconnect.",
+ oJ);
+ }
+ try {
+ prepareMessageReceiver();
+ errorDelay = 0;
+ }
+ // try to reconnect to the queue
+ catch (Exception e) {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("Reconnecting to Queue", e);
+ }
+ if (errorDelay == 0) {
+ errorDelay = MIN_ERROR_DELAY;
+ } else if (errorDelay < MAX_ERROR_DELAY) {
+ errorDelay <<= 1;
+ }
+ _logger
+ .warn("Error reconnecting to Queue, backing off for "
+ + errorDelay + " milliseconds");
+ waitForRunningStateChange(
+ ManagedLifecycleThreadState.STOPPING, errorDelay);
+ }
+ }
+ return null;
} // ________________________________
protected final static Logger _logger = Logger
- .getLogger(JmsGatewayListener.class);
+ .getLogger(JmsGatewayListener.class);
protected String _queueName;
@@ -509,18 +446,18 @@
protected JmsConnectionPool _pool;
/**
- * The minimum error delay.
- */
+ * The minimum error delay.
+ */
private static final long MIN_ERROR_DELAY = 1000;
/**
- * The maximum error delay.
- */
+ * The maximum error delay.
+ */
private static final long MAX_ERROR_DELAY = (MIN_ERROR_DELAY << 5);
/**
- * The error delay.
- */
+ * The error delay.
+ */
private long errorDelay;
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2007-07-11 10:55:44 UTC (rev 13353)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2007-07-11 11:36:12 UTC (rev 13354)
@@ -68,359 +68,313 @@
import org.jboss.soa.esb.util.Util;
/**
- *
* Polls an SQL table for rows that satisfy conditions defined in the xml
* runtime configuration
- *
+ * <p/>
* <p/>When a row that matches conditions is retrieved, it's contents are packed
* into an ESB Message and
- *
+ * <p/>
* <p/> The following fields are mandatory (see checkMyParms()): <br/> <br/>SQL
* table name <br/>list of fields to retrieve <br/>list of key fields to use in
* the update statement <br/>a field that will be used to mark a row as
* 'pending(p)', 'in process(w)', 'done(d)' or 'in error(e)'
- *
+ *
* @author <a
* href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
* @since Version 4.0
- *
*/
-public class SqlTableGatewayListener extends AbstractThreadedManagedLifecycle
-{
+public class SqlTableGatewayListener extends AbstractThreadedManagedLifecycle {
/**
- * serial version uid for this class
- */
+ * serial version uid for this class
+ */
private static final long serialVersionUID = -4394272471377134121L;
public SqlTableGatewayListener(ConfigTree config)
- throws ConfigurationException
- {
- super(config);
- _config = config;
- _sleepBetweenPolls = 10000; // milliseconds TODO magic number
- checkMyParms();
+ throws ConfigurationException {
+ super(config);
+ _config = config;
+ _sleepBetweenPolls = 10000; // milliseconds TODO magic number
+ checkMyParms();
} // __________________________________
/**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException
- * for errors while initialisation.
- */
- protected void doInitialise () throws ManagedLifecycleException
- {
- try
- {
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
- _targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <"
- + _targetServiceName + "> not found in registry");
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException(
- "Unexpected registry exception", re);
- }
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise() throws ManagedLifecycleException {
+ try {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,
+ _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <"
+ + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re) {
+ throw new ManagedLifecycleException(
+ "Unexpected registry exception", re);
+ }
- boolean failure = true;
- try
- {
- prepareStatements();
- failure = false;
- }
- catch (final SQLException sqle)
- {
- throw new ManagedLifecycleException(
- "Unexpected error initialising statements", sqle);
- }
- finally
- {
- if (failure)
- {
- if (_dbConn != null)
- {
- _dbConn.release();
- _dbConn = null;
- }
- }
- }
+ boolean failure = true;
+ try {
+ prepareStatements();
+ failure = false;
+ }
+ catch (final SQLException sqle) {
+ throw new ManagedLifecycleException(
+ "Unexpected error initialising statements", sqle);
+ }
+ finally {
+ if (failure) {
+ if (_dbConn != null) {
+ _dbConn.release();
+ _dbConn = null;
+ }
+ }
+ }
}
/**
- * Execute on the thread.
- */
- protected void doRun ()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("doRun() method of "
- + this.getClass().getSimpleName() + " started on thread "
- + Thread.currentThread().getName());
- }
+ * Execute on the thread.
+ */
+ protected void doRun() {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("doRun() method of "
+ + this.getClass().getSimpleName() + " started on thread "
+ + Thread.currentThread().getName());
+ }
- do
- {
- for (Map<String, Object> row : pollForCandidates())
- {
- _currentRow = row;
- // Try to mark as 'in process' - if unsuccessful, somebody else
- // got it first
- if (!changeStatusToWorking())
- continue;
+ do {
+ for (Map<String, Object> row : pollForCandidates()) {
+ _currentRow = row;
+ // Try to mark as 'in process' - if unsuccessful, somebody else
+ // got it first
+ if (!changeStatusToWorking())
+ continue;
- Throwable thrown = null;
- String text = null;
- try
- {
- Object obj = _processMethod.invoke(_composer, new Object[]
- { _currentRow });
- if (null == obj)
- {
- _logger.warn("Action class method <"
- + _processMethod.getName()
- + "> returned a null object");
- continue;
- }
- Message message = (Message) obj;
- Map<String, Object> params = new HashMap<String, Object>();
-
- params.put(Environment.GATEWAY_CONFIG, _config);
-
- message = FilterManager.getInstance().doOutputWork(message, params);
-
- boolean bSent = false;
- for (EPR current : _targetEprs)
- {
- _courier = CourierFactory.getCourier(current);
- try
- {
- if (_courier.deliver(message))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier);
- }
- }
- if (!bSent)
- {
- text = "Target service <" + _targetServiceCategory
- + "," + _targetServiceName
- + "> is not registered";
- thrown = new Exception(text);
- }
- }
- catch (InvocationTargetException e)
- {
- thrown = e;
- text = "Problems invoking method <"
- + _processMethod.getName() + ">";
- }
- catch (IllegalAccessException e)
- {
- thrown = e;
- text = "Problems invoking method <"
- + _processMethod.getName() + ">";
- }
- catch (ClassCastException e)
- {
- thrown = e;
- text = "Action class method <" + _processMethod.getName()
- + "> returned a non Message object";
- }
- catch (CourierException e)
- {
- thrown = e;
- text = "Courier <" + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED";
- }
- catch (MalformedEPRException ex)
- {
- thrown = ex;
- text = "Courier <"
- + _courier.getClass().getName()
- + ".deliverAsync(Message) FAILED with malformed EPR.";
- }
+ Throwable thrown = null;
+ String text = null;
+ try {
+ Object obj = _processMethod.invoke(_composer, new Object[]
+ {_currentRow});
+ if (null == obj) {
+ _logger.warn("Action class method <"
+ + _processMethod.getName()
+ + "> returned a null object");
+ continue;
+ }
+ Message message = (Message) obj;
+ Map<String, Object> params = new HashMap<String, Object>();
- if (null == thrown)
- {
- if (_deleteAfterOK)
- deleteCurrentRow();
- else
- changeStatusToDone();
- }
- else
- {
- _logger.error(text);
- _logger.debug(text, thrown);
- changeStatusToError();
- }
- }
- }
- while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
- _sleepBetweenPolls));
+ params.put(Environment.GATEWAY_CONFIG, _config);
- if (_logger.isDebugEnabled())
- {
- _logger
- .debug("run() method of " + this.getClass().getSimpleName()
- + " finished on thread "
- + Thread.currentThread().getName());
- }
+ message = FilterManager.getInstance().doOutputWork(message, params);
+
+ boolean bSent = false;
+ for (EPR current : _targetEprs) {
+ _courier = CourierFactory.getCourier(current);
+ try {
+ if (_courier.deliver(message)) {
+ bSent = true;
+ break;
+ }
+ }
+ finally {
+ CourierUtil.cleanCourier(_courier);
+ }
+ }
+ if (!bSent) {
+ text = "Target service <" + _targetServiceCategory
+ + "," + _targetServiceName
+ + "> is not registered";
+ thrown = new Exception(text);
+ }
+ }
+ catch (InvocationTargetException e) {
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
+ }
+ catch (IllegalAccessException e) {
+ thrown = e;
+ text = "Problems invoking method <"
+ + _processMethod.getName() + ">";
+ }
+ catch (ClassCastException e) {
+ thrown = e;
+ text = "Action class method <" + _processMethod.getName()
+ + "> returned a non Message object";
+ }
+ catch (CourierException e) {
+ thrown = e;
+ text = "Courier <" + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED";
+ }
+ catch (MalformedEPRException ex) {
+ thrown = ex;
+ text = "Courier <"
+ + _courier.getClass().getName()
+ + ".deliverAsync(Message) FAILED with malformed EPR.";
+ }
+
+ if (null == thrown) {
+ if (_deleteAfterOK)
+ deleteCurrentRow();
+ else
+ changeStatusToDone();
+ } else {
+ _logger.error(text);
+ _logger.debug(text, thrown);
+ changeStatusToError();
+ }
+ }
+ }
+ while (!waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING,
+ _sleepBetweenPolls));
+
+ if (_logger.isDebugEnabled()) {
+ _logger
+ .debug("run() method of " + this.getClass().getSimpleName()
+ + " finished on thread "
+ + Thread.currentThread().getName());
+ }
} // ________________________________
/**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException
- * for errors while destroying.
- */
- protected void doDestroy () throws ManagedLifecycleException
- {
- if (_dbConn != null)
- {
- _dbConn.release();
- }
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy() throws ManagedLifecycleException {
+ if (_dbConn != null) {
+ _dbConn.release();
+ }
}
/**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- private void checkMyParms () throws ConfigurationException
- {
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ private void checkMyParms() throws ConfigurationException {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- // Polling interval
- String sAux = _config
- .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
+ // Polling interval
+ String sAux = _config
+ .getAttribute(ListenerTagNames.POLL_LATENCY_SECS_TAG);
- if (!Util.isNullString(sAux))
- {
- try
- {
- _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid poll latency - keeping default of "
- + (_sleepBetweenPolls / 1000));
- }
- }
- else
- {
- _logger.warn("No value specified for: "
- + ListenerTagNames.POLL_LATENCY_SECS_TAG
- + " - Using default of " + (_sleepBetweenPolls / 1000));
- }
+ if (!Util.isNullString(sAux)) {
+ try {
+ _sleepBetweenPolls = 1000 * Long.parseLong(sAux);
+ }
+ catch (NumberFormatException e) {
+ _logger.warn("Invalid poll latency - keeping default of "
+ + (_sleepBetweenPolls / 1000));
+ }
+ } else {
+ _logger.warn("No value specified for: "
+ + ListenerTagNames.POLL_LATENCY_SECS_TAG
+ + " - Using default of " + (_sleepBetweenPolls / 1000));
+ }
- resolveComposerClass();
+ resolveComposerClass();
- _driver = ListenerUtil.obtainAtt(_config, JDBCEpr.DRIVER_TAG, null);
- _url = ListenerUtil.obtainAtt(_config, JDBCEpr.URL_TAG, null);
- _user = ListenerUtil.obtainAtt(_config, JDBCEpr.USERNAME_TAG, null);
- _password = ListenerUtil.obtainAtt(_config, JDBCEpr.PASSWORD_TAG, "");
+ _driver = ListenerUtil.obtainAtt(_config, JDBCEpr.DRIVER_TAG, null);
+ _url = ListenerUtil.obtainAtt(_config, JDBCEpr.URL_TAG, null);
+ _user = ListenerUtil.obtainAtt(_config, JDBCEpr.USERNAME_TAG, null);
+ _password = ListenerUtil.obtainAtt(_config, JDBCEpr.PASSWORD_TAG, "");
- _tableName = _config.getAttribute(ListenerTagNames.SQL_TABLE_NAME_TAG);
- if (null == _tableName)
- _tableName = _config.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG);
- if (Util.isNullString(_tableName))
- throw new ConfigurationException("Empty or invalid table name");
+ _tableName = _config.getAttribute(ListenerTagNames.SQL_TABLE_NAME_TAG);
+ if (null == _tableName)
+ _tableName = _config.getRequiredAttribute(JDBCEpr.TABLE_NAME_TAG);
+ if (Util.isNullString(_tableName))
+ throw new ConfigurationException("Empty or invalid table name");
- _selectFields = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_SELECT_FIELDS_TAG, "*");
- if (Util.isNullString(_selectFields))
- throw new ConfigurationException(
- "Empty or invalid list of select fields");
- _keyFields = _config.getAttribute(ListenerTagNames.SQL_KEY_FIELDS_TAG);
- if (null == _keyFields)
- _keyFields = _config
- .getRequiredAttribute(JDBCEpr.MESSAGE_ID_COLUMN_TAG);
- if (Util.isNullString(_keyFields))
- throw new ConfigurationException(
- "Empty or invalid list of key fields");
- _inProcessField = _config
- .getAttribute(ListenerTagNames.SQL_IN_PROCESS_FIELD_TAG);
- if (null == _inProcessField)
- _inProcessField = _config.getAttribute(JDBCEpr.STATUS_COLUMN_TAG);
- if (Util.isNullString(_inProcessField))
- throw new ConfigurationException(
- "A valid inProcessField attribute must be specified");
+ _selectFields = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_SELECT_FIELDS_TAG, "*");
+ if (Util.isNullString(_selectFields))
+ throw new ConfigurationException(
+ "Empty or invalid list of select fields");
+ _keyFields = _config.getAttribute(ListenerTagNames.SQL_KEY_FIELDS_TAG);
+ if (null == _keyFields)
+ _keyFields = _config
+ .getRequiredAttribute(JDBCEpr.MESSAGE_ID_COLUMN_TAG);
+ if (Util.isNullString(_keyFields))
+ throw new ConfigurationException(
+ "Empty or invalid list of key fields");
+ _inProcessField = _config
+ .getAttribute(ListenerTagNames.SQL_IN_PROCESS_FIELD_TAG);
+ if (null == _inProcessField)
+ _inProcessField = _config.getAttribute(JDBCEpr.STATUS_COLUMN_TAG);
+ if (Util.isNullString(_inProcessField))
+ throw new ConfigurationException(
+ "A valid inProcessField attribute must be specified");
- _where = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_WHERE_CONDITION_TAG, "");
- if (_where.trim().length() < 1)
- _logger.debug("No value specified for: "
- + ListenerTagNames.SQL_WHERE_CONDITION_TAG);
- _orderBy = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_ORDER_BY_TAG, "");
- if (_orderBy.trim().length() < 1)
- _logger.debug("No value specified for: "
- + ListenerTagNames.SQL_ORDER_BY_TAG);
- _inProcessVals = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG,
- DEFAULT_IN_PROCESS_STATES);
+ _where = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_WHERE_CONDITION_TAG, "");
+ if (_where.trim().length() < 1)
+ _logger.debug("No value specified for: "
+ + ListenerTagNames.SQL_WHERE_CONDITION_TAG);
+ _orderBy = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_ORDER_BY_TAG, "");
+ if (_orderBy.trim().length() < 1)
+ _logger.debug("No value specified for: "
+ + ListenerTagNames.SQL_ORDER_BY_TAG);
+ _inProcessVals = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG,
+ DEFAULT_IN_PROCESS_STATES);
- _deleteAfterOK = Boolean.parseBoolean(ListenerUtil.obtainAtt(_config,
- ListenerTagNames.SQL_POST_DEL_TAG, "false"));
- if (null == _config.getAttribute(ListenerTagNames.SQL_POST_DEL_TAG))
- _logger
- .debug("No value specified for: "
- + ListenerTagNames.SQL_POST_DEL_TAG
- + " - trigger row will not be deleted - 'in process field' will be used to show processing status");
+ _deleteAfterOK = Boolean.parseBoolean(ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.SQL_POST_DEL_TAG, "false"));
+ if (null == _config.getAttribute(ListenerTagNames.SQL_POST_DEL_TAG))
+ _logger
+ .debug("No value specified for: "
+ + ListenerTagNames.SQL_POST_DEL_TAG
+ + " - trigger row will not be deleted - 'in process field' will be used to show processing status");
- if (_inProcessVals.length() < 4)
- throw new ConfigurationException("Parameter <"
- + ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG
- + "> must be at least 4 characters long (PWED)");
+ if (_inProcessVals.length() < 4)
+ throw new ConfigurationException("Parameter <"
+ + ListenerTagNames.SQL_IN_PROCESS_VALUES_TAG
+ + "> must be at least 4 characters long (PWED)");
- _columns = _selectFields.split(",");
- if (_columns.length < 1)
- throw new ConfigurationException("Empty list of select fields");
+ _columns = _selectFields.split(",");
+ if (_columns.length < 1)
+ throw new ConfigurationException("Empty list of select fields");
- _keys = _keyFields.split(",");
- if (!"*".equals(_selectFields))
- {
- Set<String> colSet = new HashSet<String>(Arrays.asList(_columns));
- if (_keys.length < 1)
- throw new ConfigurationException("Empty list of keyFields");
- for (String currKey : _keys)
- {
- if (colSet.contains(currKey))
- continue;
- else
- {
- StringBuilder sb = new StringBuilder().append(
- "All key field names in the <").append(
- ListenerTagNames.SQL_KEY_FIELDS_TAG).append(
- "> attribute must be in the ").append(
- ListenerTagNames.SQL_SELECT_FIELDS_TAG).append(
- "list - '").append(currKey)
- .append("' is not there");
- throw new ConfigurationException(sb.toString());
- }
- }
- }
+ _keys = _keyFields.split(",");
+ if (!"*".equals(_selectFields)) {
+ Set<String> colSet = new HashSet<String>(Arrays.asList(_columns));
+ if (_keys.length < 1)
+ throw new ConfigurationException("Empty list of keyFields");
+ for (String currKey : _keys) {
+ if (colSet.contains(currKey))
+ continue;
+ else {
+ StringBuilder sb = new StringBuilder().append(
+ "All key field names in the <").append(
+ ListenerTagNames.SQL_KEY_FIELDS_TAG).append(
+ "> attribute must be in the ").append(
+ ListenerTagNames.SQL_SELECT_FIELDS_TAG).append(
+ "list - '").append(currKey)
+ .append("' is not there");
+ throw new ConfigurationException(sb.toString());
+ }
+ }
+ }
} // ________________________________
- protected void prepareStatements () throws SQLException
- {
- _PSscan = getDbConn().prepareStatement(scanStatement());
- _PSupdate = getDbConn().prepareStatement(updateStatement());
- _PSdeleteRow = getDbConn().prepareStatement(deleteStatement());
+ protected void prepareStatements() throws SQLException {
+ _PSscan = getDbConn().prepareStatement(scanStatement());
+ _PSupdate = getDbConn().prepareStatement(updateStatement());
+ _PSdeleteRow = getDbConn().prepareStatement(deleteStatement());
} // ________________________________
/*
@@ -429,399 +383,343 @@
* different types of setup exceptions.
*/
- protected void resolveComposerClass () throws ConfigurationException
- {
- try
- {
- String sProcessMethod = null;
- _composerName = _config
- .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config
- .getAttribute(
- ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
- "process");
- }
- else
- {
- _composerName = PackageRowContents.class.getName();
- _composerClass = PackageRowContents.class;
- _composer = new PackageRowContents();
- sProcessMethod = "process";
- _logger
- .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
- + "> element found in configuration"
- + " - Using default composer class : "
- + _composerName);
- }
+ protected void resolveComposerClass() throws ConfigurationException {
+ try {
+ String sProcessMethod = null;
+ _composerName = _config
+ .getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName) { // class attribute
+ _composerClass = ClassUtil.forName(_composerName, getClass());
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ {ConfigTree.class});
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config
+ .getAttribute(
+ ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG,
+ "process");
+ } else {
+ _composerName = PackageRowContents.class.getName();
+ _composerClass = PackageRowContents.class;
+ _composer = new PackageRowContents();
+ sProcessMethod = "process";
+ _logger
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG
+ + "> element found in configuration"
+ + " - Using default composer class : "
+ + _composerName);
+ }
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[]
- { Object.class });
- }
- catch (InvocationTargetException ex)
- {
- _logger.debug(ex);
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]
+ {Object.class});
+ }
+ catch (InvocationTargetException ex) {
+ _logger.debug(ex);
- throw new ConfigurationException(ex);
- }
- catch (IllegalAccessException ex)
- {
- _logger.debug(ex);
+ throw new ConfigurationException(ex);
+ }
+ catch (IllegalAccessException ex) {
+ _logger.debug(ex);
- throw new ConfigurationException(ex);
- }
- catch (InstantiationException ex)
- {
- _logger.debug(ex);
+ throw new ConfigurationException(ex);
+ }
+ catch (InstantiationException ex) {
+ _logger.debug(ex);
- throw new ConfigurationException(ex);
- }
- catch (ClassNotFoundException ex)
- {
- _logger.debug(ex);
+ throw new ConfigurationException(ex);
+ }
+ catch (ClassNotFoundException ex) {
+ _logger.debug(ex);
- throw new ConfigurationException(ex);
- }
- catch (NoSuchMethodException ex)
- {
- _logger.debug(ex);
+ throw new ConfigurationException(ex);
+ }
+ catch (NoSuchMethodException ex) {
+ _logger.debug(ex);
- throw new ConfigurationException(ex);
- }
+ throw new ConfigurationException(ex);
+ }
} // ________________________________
- protected List<Map<String, Object>> pollForCandidates ()
- {
- List<Map<String, Object>> oResults = new ArrayList<Map<String, Object>>();
- final JdbcCleanConn oConn = getDbConn();
- try
- {
- ResultSet RS = oConn.execQueryWait(_PSscan, 1);
- ResultSetMetaData meta = RS.getMetaData();
- while (RS.next())
- {
- Map<String, Object> row = new HashMap<String, Object>();
- for (int iCurr = 1; iCurr <= meta.getColumnCount(); iCurr++)
- {
- String sCol = meta.getColumnName(iCurr);
- if (!_inProcessField.equals(sCol))
- row.put(sCol, RS.getObject(iCurr));
- }
+ protected List<Map<String, Object>> pollForCandidates() {
+ List<Map<String, Object>> oResults = new ArrayList<Map<String, Object>>();
+ final JdbcCleanConn oConn = getDbConn();
+ try {
+ ResultSet RS = oConn.execQueryWait(_PSscan, 1);
+ ResultSetMetaData meta = RS.getMetaData();
+ while (RS.next()) {
+ Map<String, Object> row = new HashMap<String, Object>();
+ for (int iCurr = 1; iCurr <= meta.getColumnCount(); iCurr++) {
+ String sCol = meta.getColumnName(iCurr);
+ if (!_inProcessField.equals(sCol))
+ row.put(sCol, RS.getObject(iCurr));
+ }
- oResults.add(row);
- }
- }
- catch (Exception e)
- {
- _logger.debug("Some triggers might not have been returned", e);
- }
- finally
- {
- try
- {
- oConn.rollback();
- }
- catch (final SQLException sqle)
- {
- }
- }
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Returning " + oResults.size() + " rows.\n");
- }
- return oResults;
+ oResults.add(row);
+ }
+ }
+ catch (Exception e) {
+ _logger.debug("Some triggers might not have been returned", e);
+ }
+ finally {
+ try {
+ oConn.rollback();
+ }
+ catch (final SQLException sqle) {
+ }
+ }
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("Returning " + oResults.size() + " rows.\n");
+ }
+ return oResults;
} // ________________________________
/**
- * Obtain a new database connection with parameter info
- *
- * @return A new connection
- * @throws ConfigurationException -
- * if problems are encountered
- */
- protected JdbcCleanConn getDbConn ()
- {
- if (null == _dbConn)
- {
- DataSource oDS = new SimpleDataSource(_driver, _url, _user,
- _password);
- _dbConn = new JdbcCleanConn(oDS);
- }
- return _dbConn;
+ * Obtain a new database connection with parameter info
+ *
+ * @return A new connection
+ * @throws ConfigurationException -
+ * if problems are encountered
+ */
+ protected JdbcCleanConn getDbConn() {
+ if (null == _dbConn) {
+ DataSource oDS = new SimpleDataSource(_driver, _url, _user,
+ _password);
+ _dbConn = new JdbcCleanConn(oDS);
+ }
+ return _dbConn;
} // ________________________________
/**
- * Assemble the SQL statement to scan (poll) the table
- *
- * @return - The resulting SQL statement
- */
- protected String scanStatement ()
- {
- StringBuilder sb = new StringBuilder().append("select ").append(
- _selectFields).append(" from ").append(_tableName);
+ * Assemble the SQL statement to scan (poll) the table
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String scanStatement() {
+ StringBuilder sb = new StringBuilder().append("select ").append(
+ _selectFields).append(" from ").append(_tableName);
- boolean bWhere = !Util.isNullString(_where);
- if (bWhere)
- sb.append(" where ").append(_where);
- sb.append((bWhere) ? " and " : " where ");
+ boolean bWhere = !Util.isNullString(_where);
+ if (bWhere)
+ sb.append(" where ").append(_where);
+ sb.append((bWhere) ? " and " : " where ");
- String sLike = _inProcessVals.substring(0, 1).toUpperCase();
- sb.append(" upper(").append(_inProcessField).append(") like '").append(
- sLike).append("%'");
+ String sLike = _inProcessVals.substring(0, 1).toUpperCase();
+ sb.append(" upper(").append(_inProcessField).append(") like '").append(
+ sLike).append("%'");
- if (!Util.isNullString(_orderBy))
- sb.append(" order by ").append(_orderBy);
- return sb.toString();
+ if (!Util.isNullString(_orderBy))
+ sb.append(" order by ").append(_orderBy);
+ return sb.toString();
} // ________________________________
/**
- * Assemble the SQL statement to update the field in the
- * "inProcessField" parameter
- *
- * in the table row uniquely identified by the list of fields in the
- * "keyFields" parameter
- *
- * @return - The resulting SQL statement
- */
- protected String updateStatement ()
- {
- StringBuilder sb = new StringBuilder().append("update ").append(
- _tableName).append(" set ").append(_inProcessField).append(
- " = ? where ").append(_inProcessField).append(" = ?");
- for (String sCurr : _keys)
- {
- sb.append(" and ").append(sCurr).append(" = ?");
- }
- return sb.toString();
+ * Assemble the SQL statement to update the field in the
+ * "inProcessField" parameter
+ * <p/>
+ * in the table row uniquely identified by the list of fields in the
+ * "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String updateStatement() {
+ StringBuilder sb = new StringBuilder().append("update ").append(
+ _tableName).append(" set ").append(_inProcessField).append(
+ " = ? where ").append(_inProcessField).append(" = ?");
+ for (String sCurr : _keys) {
+ sb.append(" and ").append(sCurr).append(" = ?");
+ }
+ return sb.toString();
} // ________________________________
/**
- * Assemble the SQL "select for update" statement for the
- * "inProcessField" parameter
- *
- * in the table row uniquely identified by the list of fields in the
- * "keyFields" parameter
- *
- * @return - The resulting SQL statement
- */
- protected String selectForUpdStatement ()
- {
- StringBuilder sb = new StringBuilder().append("select ").append(
- _inProcessField).append(" from ").append(_tableName).append(
- " where ");
- int iCurr = 0;
- for (String sCurr : _keys)
- {
- if (iCurr++ > 0)
- sb.append(" and ");
- sb.append(sCurr).append(" = ?");
- }
+ * Assemble the SQL "select for update" statement for the
+ * "inProcessField" parameter
+ * <p/>
+ * in the table row uniquely identified by the list of fields in the
+ * "keyFields" parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String selectForUpdStatement() {
+ StringBuilder sb = new StringBuilder().append("select ").append(
+ _inProcessField).append(" from ").append(_tableName).append(
+ " where ");
+ int iCurr = 0;
+ for (String sCurr : _keys) {
+ if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
- /*
- * HS QL does not support FOR UPDATE! All tables appear to be inherently
- * updatable!
- */
+ /*
+ * HS QL does not support FOR UPDATE! All tables appear to be inherently
+ * updatable!
+ */
- if (_driver.contains("hsqldb"))
- return sb.toString();
- else
- return sb.append(" for update").toString();
+ if (_driver.contains("hsqldb"))
+ return sb.toString();
+ else
+ return sb.append(" for update").toString();
} // ________________________________
/**
- * Assemble the SQL statement to delete the current row in the table row
- * uniquely identified by the list of fields in the "keyFields"
- * parameter
- *
- * @return - The resulting SQL statement
- */
- protected String deleteStatement ()
- {
- StringBuilder sb = new StringBuilder().append("delete from ").append(
- _tableName).append(" where ");
- int iCurr = 0;
- for (String sCurr : _keys)
- {
- if (iCurr++ > 0)
- sb.append(" and ");
- sb.append(sCurr).append(" = ?");
- }
- return sb.toString();
+ * Assemble the SQL statement to delete the current row in the table row
+ * uniquely identified by the list of fields in the "keyFields"
+ * parameter
+ *
+ * @return - The resulting SQL statement
+ */
+ protected String deleteStatement() {
+ StringBuilder sb = new StringBuilder().append("delete from ").append(
+ _tableName).append(" where ");
+ int iCurr = 0;
+ for (String sCurr : _keys) {
+ if (iCurr++ > 0)
+ sb.append(" and ");
+ sb.append(sCurr).append(" = ?");
+ }
+ return sb.toString();
} // ________________________________
/**
- * Try to delete 'current row' from polled table
- *
- * @return true if row deletion was successful - false otherwise
- */
- protected boolean deleteCurrentRow ()
- {
- try
- {
- int iParm = 1;
- for (String sColName : _keys)
- {
- final String val = String.valueOf(_currentRow.get(sColName));
- _PSdeleteRow.setString(iParm++, val);
- }
+ * Try to delete 'current row' from polled table
+ *
+ * @return true if row deletion was successful - false otherwise
+ */
+ protected boolean deleteCurrentRow() {
+ try {
+ int iParm = 1;
+ for (String sColName : _keys) {
+ final String val = String.valueOf(_currentRow.get(sColName));
+ _PSdeleteRow.setString(iParm++, val);
+ }
- try
- {
- getDbConn().execUpdWait(_PSdeleteRow, 5);
- getDbConn().commit();
- return true;
- }
- catch (Exception e)
- {
- _logger.debug("Delete row has failed. Rolling back!!", e);
- }
+ try {
+ getDbConn().execUpdWait(_PSdeleteRow, 5);
+ getDbConn().commit();
+ return true;
+ }
+ catch (Exception e) {
+ _logger.debug("Delete row has failed. Rolling back!!", e);
+ }
- try
- {
- getDbConn().rollback();
- }
- catch (Exception e)
- {
- _logger.debug("Unable to rollback delete row", e);
- }
- }
- catch (Exception e)
- {
- _logger.debug("Unexpected exception.", e);
- }
- return false;
+ try {
+ getDbConn().rollback();
+ }
+ catch (Exception e) {
+ _logger.debug("Unable to rollback delete row", e);
+ }
+ }
+ catch (Exception e) {
+ _logger.debug("Unexpected exception.", e);
+ }
+ return false;
} // ________________________________
- protected String getStatus (ROW_STATE p_oState)
- {
- int iPos = p_oState.ordinal();
- return _inProcessVals.substring(iPos, ++iPos);
+ protected String getStatus(ROW_STATE p_oState) {
+ int iPos = p_oState.ordinal();
+ return _inProcessVals.substring(iPos, ++iPos);
} // ________________________________
- protected boolean changeStatusToWorking ()
- {
- return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+ protected boolean changeStatusToWorking() {
+ return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
} // ________________________________
- protected boolean changeStatusToDone ()
- {
- return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+ protected boolean changeStatusToDone() {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
} // ________________________________
- protected boolean changeStatusToError ()
- {
- return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+ protected boolean changeStatusToError() {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
} // ________________________________
- protected boolean changeStatus (ROW_STATE fromState, ROW_STATE toState)
- {
- try
- {
- getDbConn();
- }
- catch (Exception e)
- {
- _logger.debug("Unable to get DB connection.", e);
- throw new IllegalStateException("Unable to get DB connection.", e);
- }
+ protected boolean changeStatus(ROW_STATE fromState, ROW_STATE toState) {
+ try {
+ getDbConn();
+ }
+ catch (Exception e) {
+ _logger.debug("Unable to get DB connection.", e);
+ throw new IllegalStateException("Unable to get DB connection.", e);
+ }
- try
- {
- int iParm = 3;
- for (String sColName : _keys)
- {
- Object oVal = String.valueOf(_currentRow.get(sColName));
- _PSupdate.setObject(iParm++, oVal);
- }
+ try {
+ int iParm = 3;
+ for (String sColName : _keys) {
+ Object oVal = String.valueOf(_currentRow.get(sColName));
+ _PSupdate.setObject(iParm++, oVal);
+ }
- try
- {
- _PSupdate.setString(1, getStatus(toState));
- _PSupdate.setString(2, getStatus(fromState));
- final int count = getDbConn().execUpdWait(_PSupdate, 5);
- if (count == 1)
- {
- getDbConn().commit();
+ try {
+ _PSupdate.setString(1, getStatus(toState));
+ _PSupdate.setString(2, getStatus(fromState));
+ final int count = getDbConn().execUpdWait(_PSupdate, 5);
+ if (count == 1) {
+ getDbConn().commit();
- if (_logger.isDebugEnabled())
- _logger.debug("Successfully changed row state from "
- + fromState + " to " + toState + ".");
+ if (_logger.isDebugEnabled())
+ _logger.debug("Successfully changed row state from "
+ + fromState + " to " + toState + ".");
- return true;
- }
- else
- {
- _logger.warn("Cannot change row state from " + fromState
- + " to " + toState + ". Number of rows in state "
- + fromState + " = " + count);
- return false;
- }
- }
- catch (Exception e)
- {
- final String message = "Row status change to " + toState
- + " has failed. Rolling back!!";
- _logger.error(message);
- _logger.debug(message, e);
- }
+ return true;
+ } else {
+ _logger.warn("Cannot change row state from " + fromState
+ + " to " + toState + ". Number of rows in state "
+ + fromState + " = " + count);
+ return false;
+ }
+ }
+ catch (Exception e) {
+ final String message = "Row status change to " + toState
+ + " has failed. Rolling back!!";
+ _logger.error(message);
+ _logger.debug(message, e);
+ }
- try
- {
- getDbConn().rollback();
- }
- catch (Exception e)
- {
- final String message = "Unable to rollback row status change to "
- + fromState;
- _logger.error(message);
- _logger.debug(message, e);
- }
- }
- catch (Exception e)
- {
- final String message = "Unexpected exception.";
- _logger.error(message);
- _logger.debug(message, e);
- }
+ try {
+ getDbConn().rollback();
+ }
+ catch (Exception e) {
+ final String message = "Unable to rollback row status change to "
+ + fromState;
+ _logger.error(message);
+ _logger.debug(message, e);
+ }
+ }
+ catch (Exception e) {
+ final String message = "Unexpected exception.";
+ _logger.error(message);
+ _logger.debug(message, e);
+ }
- return false;
+ return false;
} // ________________________________
/**
- * Default gateway action for SQL table rows <p/>It will just drop the
- * result set contents into a Message
- *
- * @author <a
- * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
- * @since Version 4.0
- *
- */
- public static class PackageRowContents
- {
- public Message process (Object obj)
- {
- if (!(obj instanceof Serializable))
- throw new IllegalArgumentException(
- "Object must be instance of Map");
+ * Default gateway action for SQL table rows <p/>It will just drop the
+ * result set contents into a Message
+ *
+ * @author <a
+ * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+ public static class PackageRowContents {
+ public Message process(Object obj) {
+ if (!(obj instanceof Serializable))
+ throw new IllegalArgumentException(
+ "Object must be instance of Map");
- Message message = MessageFactory.getInstance().getMessage();
- org.jboss.soa.esb.message.Properties props = message
- .getProperties();
+ Message message = MessageFactory.getInstance().getMessage();
+ org.jboss.soa.esb.message.Properties props = message
+ .getProperties();
- props.setProperty(ListenerTagNames.SQL_ROW_DATA_TAG, obj);
+ props.setProperty(ListenerTagNames.SQL_ROW_DATA_TAG, obj);
- return message;
- }
+ return message;
+ }
} // ____________________________________________________
protected final static Logger _logger = Logger
- .getLogger(SqlTableGatewayListener.class);
+ .getLogger(SqlTableGatewayListener.class);
protected ConfigTree _config;
@@ -859,9 +757,8 @@
protected Map<String, Object> _currentRow;
- public static enum ROW_STATE
- {
- Pending, Working, Error, Done
+ public static enum ROW_STATE {
+ Pending, Working, Error, Done
}
public static final String DEFAULT_IN_PROCESS_STATES = "PWED";
More information about the jboss-svn-commits
mailing list