[jboss-svn-commits] JBL Code SVN: r15383 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss: soa/esb/listeners/config/mappers and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Sep 26 13:11:44 EDT 2007
Author: tfennelly
Date: 2007-09-26 13:11:44 -0400 (Wed, 26 Sep 2007)
New Revision: 15383
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
JmsGatewayListener doesn't support JMS Topics: http://jira.jboss.com/jira/browse/JBESB-1059
Still some tests/quickstart to do.
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2007-09-26 17:11:44 UTC (rev 15383)
@@ -159,7 +159,7 @@
* @return Connection to be used
* @throws ConnectionException
*/
- private synchronized Session getSession() throws NamingException, JMSException, ConnectionException
+ public synchronized Session getSession() throws NamingException, JMSException, ConnectionException
{
Session session = null;
int waitInSeconds = 0;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java 2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java 2007-09-26 17:11:44 UTC (rev 15383)
@@ -106,8 +106,12 @@
}
private static void mapJmsEprProperties(Element toElement, JmsProvider provider, JmsMessageFilter messageFilter) {
- toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, messageFilter.getDestType().toString().toLowerCase());
- toElement.setAttribute(JMSEpr.DESTINATION_NAME_TAG, messageFilter.getDestName());
+ if(messageFilter.getDestType() == JmsMessageFilter.DestType.QUEUE) {
+ toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, JMSEpr.QUEUE_TYPE);
+ } else {
+ toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, JMSEpr.TOPIC_TYPE);
+ }
+ toElement.setAttribute(JMSEpr.DESTINATION_NAME_TAG, messageFilter.getDestName());
toElement.setAttribute(JMSEpr.MESSAGE_SELECTOR_TAG, messageFilter.getSelector());
toElement.setAttribute(JMSEpr.CONNECTION_FACTORY_TAG, provider.getConnectionFactory());
toElement.setAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG, provider.getJndiContextFactory());
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java 2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java 2007-09-26 17:11:44 UTC (rev 15383)
@@ -23,10 +23,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import javax.management.MBeanServer;
@@ -465,6 +462,11 @@
String propertyNames[] = properties.getNames();
Map responseMap = invocationRequest.getReturnPayload();
+ if(responseMap == null) {
+ responseMap = new LinkedHashMap();
+ invocationRequest.setReturnPayload(responseMap);
+ }
+
for(String name : propertyNames) {
Object value = properties.getProperty(name);
if(value != null) {
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-09-26 17:11:44 UTC (rev 15383)
@@ -31,10 +31,7 @@
import java.util.Properties;
import java.util.Set;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
+import javax.jms.*;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -222,21 +219,29 @@
* @throws ManagedLifecycleException for errors while destroying.
*/
protected void doThreadedDestroy() throws ManagedLifecycleException {
- if (_serviceName != null) {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
- }
+ cleanup();
+ }
- if (_messageReceiver != null) {
+ private void cleanup() {
+ try {
+ if (_serviceName != null) {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+ }
+ } finally {
try {
- _messageReceiver.close();
+ if (jmsMessageConsumer != null) {
+ try {
+ jmsMessageConsumer.close();
+ }
+ catch (final JMSException jmse) {
+ } // ignore
+ }
+ } finally {
+ if (jmsSession != null) {
+ jmsConnectionPool.closeSession(jmsSession);
+ }
}
- catch (final JMSException jmse) {
- } // ignore
}
-
- if (_queueSession != null) {
- _pool.closeSession(_queueSession);
- }
}
/**
@@ -259,10 +264,10 @@
if (_targetServiceName == null)
throw new ConfigurationException("No service name defined!");
- _queueName = ListenerUtil.getValue(_config,
+ jmsDestinationName = ListenerUtil.getValue(_config,
JMSEpr.DESTINATION_NAME_TAG, null);
- if (_queueName == null)
+ if (jmsDestinationName == null)
throw new ConfigurationException("No queue name defined!");
resolveComposerClass();
@@ -310,8 +315,8 @@
private void prepareMessageReceiver() throws ConfigurationException,
JMSException, ConnectionException {
- _queueSession = null;
- _queue = null;
+ jmsSession = null;
+ jmsDestination = null;
Properties environment = new Properties();
@@ -349,35 +354,48 @@
_serviceCategory = _config
.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
_serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
- _queueName, sFactClass, environment, _messageSelector);
- _pool = JmsConnectionPoolContainer.getPool(environment, sFactClass,
- JMSEpr.QUEUE_TYPE);
+ String destType = _config.getAttribute(JMSEpr.DESTINATION_TYPE_TAG);
+ _myEpr = (null == _serviceName) ? null : new JMSEpr(destType,
+ jmsDestinationName, sFactClass, environment, _messageSelector);
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment, sFactClass, destType);
+
try {
- _queueSession = _pool.getQueueSession();
+ jmsSession = jmsConnectionPool.getSession();
}
catch (NamingException ne) {
- throw new ConfigurationException(
- "Failed to obtain queue session from pool", ne);
+ throw new ConfigurationException("Failed to obtain queue session from pool", ne);
}
try {
- _queue = (Queue) oJndiCtx.lookup(_queueName);
+ jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
}
catch (NamingException nex) {
try {
oJndiCtx = NamingContext.getServerContext(environment);
- _queue = (Queue) oJndiCtx.lookup(_queueName);
+ jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
}
catch (NamingException ne) {
- _queue = _queueSession.createQueue(_queueName);
+ if(jmsSession instanceof QueueSession) {
+ jmsDestination = jmsSession.createQueue(jmsDestinationName);
+ } else {
+ jmsDestination = jmsSession.createTopic(jmsDestinationName);
+ }
}
}
- _messageReceiver = _queueSession.createReceiver(_queue,
- _messageSelector);
+ if(jmsSession instanceof QueueSession && jmsDestination instanceof Queue) {
+ jmsMessageConsumer = ((QueueSession)jmsSession).createReceiver((Queue)jmsDestination, _messageSelector);
+ } else if(jmsSession instanceof TopicSession && jmsDestination instanceof Topic) {
+ jmsMessageConsumer = ((TopicSession)jmsSession).createSubscriber((Topic)jmsDestination, _messageSelector, false);
+ } else {
+ try {
+ throw new ConfigurationException("The JMS destination identified by name '" + jmsDestinationName + "' must match it's configured destination type '" + jmsDestinationName + "'.");
+ } finally {
+ cleanup();
+ }
+ }
} // ________________________________
/**
@@ -388,7 +406,7 @@
protected javax.jms.Message receiveOne() {
while (isRunning())
try {
- javax.jms.Message ret = _messageReceiver.receive(200);
+ javax.jms.Message ret = jmsMessageConsumer.receive(200);
if (null != ret)
return ret;
}
@@ -426,13 +444,13 @@
protected final static Logger _logger = Logger
.getLogger(JmsGatewayListener.class);
- protected String _queueName;
+ protected String jmsDestinationName;
- protected QueueSession _queueSession;
+ protected Session jmsSession;
- protected Queue _queue;
+ protected Destination jmsDestination;
- protected MessageConsumer _messageReceiver;
+ protected MessageConsumer jmsMessageConsumer;
protected String _messageSelector;
@@ -456,7 +474,7 @@
protected Courier _courier;
- protected JmsConnectionPool _pool;
+ protected JmsConnectionPool jmsConnectionPool;
/**
* The minimum error delay.
More information about the jboss-svn-commits
mailing list