[jboss-svn-commits] JBL Code SVN: r10248 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Mar 15 21:59:04 EDT 2007
Author: bill.burke at jboss.com
Date: 2007-03-15 21:59:04 -0400 (Thu, 15 Mar 2007)
New Revision: 10248
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
formatting
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-03-16 00:57:00 UTC (rev 10247)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-03-16 01:59:04 UTC (rev 10248)
@@ -22,26 +22,6 @@
package org.jboss.soa.esb.listeners.gateway;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Enumeration;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueSession;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.NamingException;
-
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
@@ -62,473 +42,497 @@
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.services.registry.RegistryException;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Enumeration;
+
public class JmsGatewayListener extends AbstractThreadedManagedLifecycle
{
- /**
- * serial version uid for this class
- */
- private static final long serialVersionUID = 5070422864110923930L;
-
- public JmsGatewayListener (ConfigTree listenerConfig) throws ConfigurationException
- {
- super(listenerConfig) ;
- _config = listenerConfig;
- _sleepForRetries = 3000; // milliseconds TODO magic number
- checkMyParms();
- } // __________________________________
+ /**
+ * serial version uid for this class
+ */
+ private static final long serialVersionUID = 5070422864110923930L;
-
- /**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while initialisation.
- */
- protected void doInitialise()
- throws ManagedLifecycleException
- {
+ public JmsGatewayListener(ConfigTree listenerConfig) throws ConfigurationException
+ {
+ super(listenerConfig);
+ _config = listenerConfig;
+ _sleepForRetries = 3000; // milliseconds TODO magic number
+ checkMyParms();
+ } // __________________________________
+
+
+ /**
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise()
+ throws ManagedLifecycleException
+ {
+ try
+ {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory, _targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry");
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException("Unexpected registry exception", re);
+ }
+
+ if (_serviceName != null)
+ {
+ try
+ {
+ RegistryUtil.register(_config, _myEpr);
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException("Unexpected error during registration for epr " + _myEpr, re);
+ }
+ }
+
+ try
+ {
+ prepareMessageReceiver();
+ }
+ catch (final JMSException jmse)
+ {
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+ }
+ throw new ManagedLifecycleException("Unexpected JMS error from prepareMessageReceiver", jmse);
+ }
+ catch (final ConfigurationException ce)
+ {
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+ }
+ throw new ManagedLifecycleException("Unexpected configuration exception from prepareMessageReceiver", ce);
+ }
+ }
+
+ /**
+ * Execute on the thread.
+ */
+ protected void doRun()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName() +
+ " started on thread " + Thread.currentThread().getName());
+ }
+
+ while (isRunning())
+ {
+ javax.jms.Message msgIn = receiveOne();
+ if (null != msgIn)
try
{
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
+ Object obj = _processMethod.invoke(_composer,
+ new Object[]{msgIn});
+ if (null == obj)
+ {
+ _logger.warn("Action class method <" + _processMethod
+ .getName() + "> returned a null object");
+ continue;
+ }
+ // try to deliver the composed message, using the
+ // appropriate courier
+ // to the target service
+ try
+ {
+ boolean bSent = false;
+ for (EPR current : _targetEprs)
+ {
+ _courier = CourierFactory.getCourier(current);
+ try
+ {
+ if (_courier
+ .deliver((org.jboss.soa.esb.message.Message) obj))
+ {
+ bSent = true;
+ break;
+ }
+ }
+ finally
+ {
+ CourierUtil.cleanCourier(_courier);
+ }
+ }
+ if (!bSent)
+ {
+ String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
+ throw new Exception(text);
+ }
+ }
+ catch (ClassCastException e)
+ {
+ _logger.error("Action class method <" + _processMethod
+ .getName() + "> returned a non Message object",
+ e);
+ continue;
+ }
+ catch (CourierException e)
+ {
+ String text = (null != _courier) ? "Courier <" + _courier
+ .getClass().getName() + ".deliver(Message) FAILED" : "NULL courier can't deliver Message";
+ _logger.error(text, e);
+ continue;
+ }
+ continue;
}
- catch (final RegistryException re)
+ catch (InvocationTargetException e)
{
- throw new ManagedLifecycleException("Unexpected registry exception", re) ;
+ _logger.error("Problems invoking method <" + _processMethod
+ .getName() + ">", e);
}
-
- if (_serviceName != null)
+ catch (IllegalAccessException e)
{
- try
- {
- RegistryUtil.register(_config, _myEpr);
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected error during registration for epr " + _myEpr, re);
- }
+ _logger.error("Problems invoking method <" + _processMethod
+ .getName() + ">", e);
}
-
- try
+ catch (Exception e)
{
- prepareMessageReceiver();
+ _logger.error("Unexpected problem", e);
}
- catch (final JMSException jmse)
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
- }
- throw new ManagedLifecycleException("Unexpected JMS error from prepareMessageReceiver", jmse);
- }
- catch (final ConfigurationException ce)
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
- }
- throw new ManagedLifecycleException("Unexpected configuration exception from prepareMessageReceiver", ce);
- }
- }
-
- /**
- * Execute on the thread.
- */
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName() +
- " started on thread " + Thread.currentThread().getName());
- }
+ }
- while (isRunning())
- {
- javax.jms.Message msgIn = receiveOne();
- if (null != msgIn)
- try
- {
- Object obj = _processMethod.invoke(_composer,
- new Object[] { msgIn });
- if (null == obj)
- {
- _logger.warn("Action class method <" + _processMethod
- .getName() + "> returned a null object");
- continue;
- }
- // try to deliver the composed message, using the
- // appropriate courier
- // to the target service
- try
- {
- boolean bSent = false;
- for (EPR current : _targetEprs)
- {
- _courier = CourierFactory.getCourier(current);
- try
- {
- if (_courier
- .deliver((org.jboss.soa.esb.message.Message) obj))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier) ;
- }
- }
- if (!bSent)
- {
- String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
- throw new Exception(text);
- }
- }
- catch (ClassCastException e)
- {
- _logger.error("Action class method <" + _processMethod
- .getName() + "> returned a non Message object",
- e);
- continue;
- }
- catch (CourierException e)
- {
- String text = (null != _courier) ? "Courier <" + _courier
- .getClass().getName() + ".deliver(Message) FAILED" : "NULL courier can't deliver Message";
- _logger.error(text, e);
- continue;
- }
- continue;
- }
- catch (InvocationTargetException e)
- {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- }
- catch (IllegalAccessException e)
- {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- }
- catch (Exception e)
- {
- _logger.error("Unexpected problem", e);
- }
- }
+ _logger
+ .debug("run() method of " + this.getClass().getSimpleName() + " finished on thread " + Thread
+ .currentThread().getName());
+ } // ________________________________
- _logger
- .debug("run() method of " + this.getClass().getSimpleName() + " finished on thread " + Thread
- .currentThread().getName());
- } // ________________________________
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy()
+ throws ManagedLifecycleException
+ {
+ if (_messageReceiver != null)
+ {
+ try
+ {
+ _messageReceiver.close();
+ }
+ catch (final JMSException jmse)
+ {
+ } // ignore
+ }
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- if (_messageReceiver != null)
- {
- try
- {
- _messageReceiver.close();
- }
- catch (final JMSException jmse) {} // ignore
- }
-
- if (_queueSession != null)
- {
- try
- {
- _queueSession.close();
- }
- catch (final JMSException jmse) {} // ignore
- }
-
- if (_queueConnection != null)
- {
- try
- {
- _queueConnection.close();
- }
- catch (final JMSException jmse) {} // ignore
- }
-
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
- }
-
- }
+ if (_queueSession != null)
+ {
+ try
+ {
+ _queueSession.close();
+ }
+ catch (final JMSException jmse)
+ {
+ } // ignore
+ }
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms () throws ConfigurationException
- {
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ if (_queueConnection != null)
+ {
+ try
+ {
+ _queueConnection.close();
+ }
+ catch (final JMSException jmse)
+ {
+ } // ignore
+ }
- _queueName = ListenerUtil.obtainAtt(_config,
- JMSEpr.DESTINATION_NAME_TAG, null);
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+ }
- resolveComposerClass();
+ }
- // No problem if selector is null - everything in queue will be returned
- _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
- _logger
- .debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG + " - All messages in queue will be received by this listener");
- } // ________________________________
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException
+ {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- protected void resolveComposerClass () throws ConfigurationException
- {
- try
- {
- String sProcessMethod = null;
- _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = Class.forName(_composerName);
- Constructor oConst = _composerClass.getConstructor(new Class[]
- { ConfigTree.class });
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
- }
- else
- {
- _composerName = PackageJmsMessageContents.class.getName();
- _composerClass = PackageJmsMessageContents.class;
- _composer = new PackageJmsMessageContents();
- sProcessMethod = "process";
- _logger
- .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
- }
-
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[] { Object.class });
- }
- catch (Exception ex)
- {
- throw new ConfigurationException(ex);
- }
- } // ________________________________
+ _queueName = ListenerUtil.obtainAtt(_config,
+ JMSEpr.DESTINATION_NAME_TAG, null);
- private void prepareMessageReceiver () throws ConfigurationException, JMSException
- {
- _queueConnection = null;
- _queueSession = null;
- _queue = null;
+ resolveComposerClass();
- String sJndiURL = ListenerUtil.obtainAtt(_config, JMSEpr.JNDI_URL_TAG,"");
- if (null == _config.getAttribute(JMSEpr.JNDI_URL_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.JNDI_URL_TAG + " attribute" + " - Using default of: '" + sJndiURL + "'");
- String sJndiContextFactory = ListenerUtil.obtainAtt(_config,
- JMSEpr.JNDI_CONTEXT_FACTORY_TAG,"");
- if (null == _config.getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.JNDI_CONTEXT_FACTORY_TAG + " attribute" + " - Using default of: '" + sJndiContextFactory + "'");
- String sJndiPkgPrefix = ListenerUtil.obtainAtt(_config,
- JMSEpr.JNDI_PKG_PREFIX_TAG, "");
- if (null == _config.getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.JNDI_PKG_PREFIX_TAG + " attribute" + " - Using default of: '" + sJndiPkgPrefix + "'");
- Context oJndiCtx = NamingContext.getServerContext(sJndiURL,
- sJndiContextFactory, sJndiPkgPrefix);
- if (null == oJndiCtx)
- throw new ConfigurationException(
- "Unable fo obtain jndi context <" + sJndiURL + "," + sJndiContextFactory + "," + sJndiPkgPrefix + ">");
+ // No problem if selector is null - everything in queue will be returned
+ _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+ _logger
+ .debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG + " - All messages in queue will be received by this listener");
+ } // ________________________________
- String sFactClass = ListenerUtil.obtainAtt(_config,
- JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
- if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.CONNECTION_FACTORY_TAG + " attribute" + " - Using default of: '" + sFactClass + "'");
- _serviceCategory = _config
- .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
- _queueName, sFactClass, sJndiURL, sJndiContextFactory,
- sJndiPkgPrefix, _messageSelector);
-
- Object tmp = null;
-
- try
- {
- tmp = oJndiCtx.lookup(sFactClass);
- }
- catch (NamingException ex)
- {
- throw new ConfigurationException(ex);
- }
-
- QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+ protected void resolveComposerClass() throws ConfigurationException
+ {
+ try
+ {
+ String sProcessMethod = null;
+ _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName)
+ { // class attribute
+ _composerClass = Class.forName(_composerName);
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ {ConfigTree.class});
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
+ }
+ else
+ {
+ _composerName = PackageJmsMessageContents.class.getName();
+ _composerClass = PackageJmsMessageContents.class;
+ _composer = new PackageJmsMessageContents();
+ sProcessMethod = "process";
+ _logger
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
+ }
- _queueConnection = qcf.createQueueConnection();
- _queueSession = _queueConnection.createQueueSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
- try
- {
- _queue = (Queue) oJndiCtx.lookup(_queueName);
- }
- catch (NamingException ne)
- {
- _queue = _queueSession.createQueue(_queueName);
- }
- _queueConnection.start();
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[]{Object.class});
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
- _messageReceiver = _queueSession.createReceiver(_queue,
- _messageSelector);
- if (null != oJndiCtx) try
+ private void prepareMessageReceiver() throws ConfigurationException, JMSException
+ {
+ _queueConnection = null;
+ _queueSession = null;
+ _queue = null;
+
+ String sJndiURL = ListenerUtil.obtainAtt(_config, JMSEpr.JNDI_URL_TAG, "");
+ if (null == _config.getAttribute(JMSEpr.JNDI_URL_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.JNDI_URL_TAG + " attribute" + " - Using default of: '" + sJndiURL + "'");
+ String sJndiContextFactory = ListenerUtil.obtainAtt(_config,
+ JMSEpr.JNDI_CONTEXT_FACTORY_TAG, "");
+ if (null == _config.getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.JNDI_CONTEXT_FACTORY_TAG + " attribute" + " - Using default of: '" + sJndiContextFactory + "'");
+ String sJndiPkgPrefix = ListenerUtil.obtainAtt(_config,
+ JMSEpr.JNDI_PKG_PREFIX_TAG, "");
+ if (null == _config.getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.JNDI_PKG_PREFIX_TAG + " attribute" + " - Using default of: '" + sJndiPkgPrefix + "'");
+ Context oJndiCtx = NamingContext.getServerContext(sJndiURL,
+ sJndiContextFactory, sJndiPkgPrefix);
+ if (null == oJndiCtx)
+ throw new ConfigurationException(
+ "Unable fo obtain jndi context <" + sJndiURL + "," + sJndiContextFactory + "," + sJndiPkgPrefix + ">");
+
+ String sFactClass = ListenerUtil.obtainAtt(_config,
+ JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.CONNECTION_FACTORY_TAG + " attribute" + " - Using default of: '" + sFactClass + "'");
+ _serviceCategory = _config
+ .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
+ _queueName, sFactClass, sJndiURL, sJndiContextFactory,
+ sJndiPkgPrefix, _messageSelector);
+
+ Object tmp = null;
+
+ try
+ {
+ tmp = oJndiCtx.lookup(sFactClass);
+ }
+ catch (NamingException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ _queueConnection = qcf.createQueueConnection();
+ _queueSession = _queueConnection.createQueueSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ try
+ {
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ }
+ catch (NamingException ne)
+ {
+ _queue = _queueSession.createQueue(_queueName);
+ }
+ _queueConnection.start();
+
+ _messageReceiver = _queueSession.createReceiver(_queue,
+ _messageSelector);
+ if (null != oJndiCtx) try
+ {
+ oJndiCtx.close();
+ }
+ catch (NamingException ne)
+ {
+ _logger.error(ne.getMessage(), ne);
+ }
+
+ } // ________________________________
+
+ /**
+ * Receive one message and retry if connection
+ *
+ * @return javax.jms.Message - One input message, or null
+ */
+ protected javax.jms.Message receiveOne()
+ {
+ while (isRunning())
+ try
{
- oJndiCtx.close();
+ javax.jms.Message ret = _messageReceiver.receive(200);
+ if (null != ret) return ret;
}
- catch (NamingException ne)
+ catch (JMSException oJ)
{
- _logger.error(ne.getMessage(), ne);
+ _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
+ try
+ {
+ prepareMessageReceiver();
+ }
+ // try to reconnect to the queue
+ catch (Exception e)
+ {
+ _logger.error("Reconnecting to Queue", e);
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepForRetries);
+ }
}
+ return null;
+ } // ________________________________
- } // ________________________________
+ /**
+ * Default gateway action for plain jms messages <p/>It will just drop the
+ * jms message contents into a esb Message
+ *
+ * @author <a
+ * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+ private static class PackageJmsMessageContents
+ {
+ @SuppressWarnings("unchecked")
+ public Message process(Object obj) throws JMSException, IOException
+ {
+ if (!(obj instanceof javax.jms.Message))
+ throw new IllegalArgumentException(
+ "Object must be instance of javax.jms.Message");
+ byte[] bytes = getMessageContent((javax.jms.Message) obj);
+ if (null == bytes) return null;
- /**
- * Receive one message and retry if connection
- *
- * @return javax.jms.Message - One input message, or null
- */
- protected javax.jms.Message receiveOne ()
- {
- while (isRunning())
- try
- {
- javax.jms.Message ret = _messageReceiver.receive(200);
- if (null != ret) return ret;
- }
- catch (JMSException oJ)
- {
- _logger.error("JMS error on receive. Attempting JMS Destination reconnect.",oJ);
- try
- {
- prepareMessageReceiver();
- }
- // try to reconnect to the queue
- catch (Exception e)
- {
- _logger.error("Reconnecting to Queue", e);
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepForRetries) ;
- }
- }
- return null;
- } // ________________________________
+ javax.jms.Message jmsMsg = (javax.jms.Message) obj;
+ Message message = MessageFactory.getInstance().getMessage();
+ message.getBody().setContents(getMessageContent(jmsMsg));
+ Enumeration<String> EE = jmsMsg.getPropertyNames();
+ if (null != EE)
+ {
+ while (EE.hasMoreElements())
+ {
+ String name = EE.nextElement();
+ Object value = jmsMsg.getObjectProperty(name);
+ if (null != value)
+ message.getProperties().setProperty(name, value);
+ }
+ }
+ return message;
+ }
- /**
- * Default gateway action for plain jms messages <p/>It will just drop the
- * jms message contents into a esb Message
- *
- * @author <a
- * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
- * @since Version 4.0
- *
- */
- private static class PackageJmsMessageContents
- {
- @SuppressWarnings("unchecked")
- public Message process (Object obj) throws JMSException, IOException
- {
- if (!(obj instanceof javax.jms.Message))
- throw new IllegalArgumentException(
- "Object must be instance of javax.jms.Message");
- byte[] bytes = getMessageContent((javax.jms.Message) obj);
- if (null == bytes) return null;
+ private byte[] getMessageContent(javax.jms.Message jMess) throws JMSException, IOException
+ {
+ if (jMess instanceof TextMessage)
+ return ((TextMessage) jMess).getText().getBytes();
- javax.jms.Message jmsMsg = (javax.jms.Message) obj;
- Message message = MessageFactory.getInstance().getMessage();
- message.getBody().setContents(getMessageContent(jmsMsg));
- Enumeration<String> EE = jmsMsg.getPropertyNames();
- if (null != EE)
- {
- while (EE.hasMoreElements())
- {
- String name = EE.nextElement();
- Object value = jmsMsg.getObjectProperty(name);
- if (null != value)
- message.getProperties().setProperty(name, value);
- }
- }
- return message;
- }
+ if (jMess instanceof BytesMessage)
+ {
+ BytesMessage jBytes = (BytesMessage) jMess;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] ba = new byte[1000];
+ int iQread;
+ while (-1 != (iQread = jBytes.readBytes(ba)))
+ if (iQread > 0) out.write(ba, 0, iQread);
+ out.close();
+ return out.toByteArray();
+ }
- private byte[] getMessageContent (javax.jms.Message jMess) throws JMSException, IOException
- {
- if (jMess instanceof TextMessage)
- return ((TextMessage) jMess).getText().getBytes();
+ if (jMess instanceof ObjectMessage)
+ return ((ObjectMessage) jMess).getObject().toString()
+ .getBytes();
+ _logger
+ .warn("Message type " + jMess.getClass().getSimpleName() + " not supported - Message is ignored");
+ return null;
+ }
+ } // ____________________________________________________
- if (jMess instanceof BytesMessage)
- {
- BytesMessage jBytes = (BytesMessage) jMess;
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] ba = new byte[1000];
- int iQread;
- while (-1 != (iQread = jBytes.readBytes(ba)))
- if (iQread > 0) out.write(ba, 0, iQread);
- out.close();
- return out.toByteArray();
- }
+ protected final static Logger _logger = Logger
+ .getLogger(JmsGatewayListener.class);
- if (jMess instanceof ObjectMessage)
- return ((ObjectMessage) jMess).getObject().toString()
- .getBytes();
- _logger
- .warn("Message type " + jMess.getClass().getSimpleName() + " not supported - Message is ignored");
- return null;
- }
- } // ____________________________________________________
+ protected String _queueName;
- protected final static Logger _logger = Logger
- .getLogger(JmsGatewayListener.class);
+ protected QueueConnection _queueConnection;
- protected String _queueName;
+ protected QueueSession _queueSession;
- protected QueueConnection _queueConnection;
+ protected Queue _queue;
- protected QueueSession _queueSession;
+ protected MessageConsumer _messageReceiver;
- protected Queue _queue;
+ protected String _messageSelector;
- protected MessageConsumer _messageReceiver;
+ protected ConfigTree _config;
- protected String _messageSelector;
+ protected final long _sleepForRetries; // milliseconds
- protected ConfigTree _config;
+ protected String _serviceCategory, _serviceName;
- protected final long _sleepForRetries; // milliseconds
+ protected String _targetServiceCategory, _targetServiceName;
- protected String _serviceCategory, _serviceName;
+ protected EPR _myEpr;
- protected String _targetServiceCategory, _targetServiceName;
+ protected Collection<EPR> _targetEprs;
- protected EPR _myEpr;
+ protected String _composerName;
- protected Collection<EPR> _targetEprs;
+ protected Class _composerClass;
- protected String _composerName;
+ protected Object _composer;
- protected Class _composerClass;
+ protected Method _processMethod;
- protected Object _composer;
-
- protected Method _processMethod;
-
protected Courier _courier;
}
More information about the jboss-svn-commits
mailing list