[jboss-svn-commits] JBL Code SVN: r15383 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss: soa/esb/listeners/config/mappers and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Sep 26 13:11:44 EDT 2007


Author: tfennelly
Date: 2007-09-26 13:11:44 -0400 (Wed, 26 Sep 2007)
New Revision: 15383

Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
JmsGatewayListener doesn't support JMS Topics: http://jira.jboss.com/jira/browse/JBESB-1059
Still some tests/quickstart to do.

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2007-09-26 17:11:44 UTC (rev 15383)
@@ -159,7 +159,7 @@
      * @return Connection to be used
      * @throws ConnectionException
      */
-    private synchronized Session getSession() throws NamingException, JMSException, ConnectionException
+    public synchronized Session getSession() throws NamingException, JMSException, ConnectionException
     {
         Session session = null;
         int waitInSeconds = 0;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java	2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/JmsListenerMapper.java	2007-09-26 17:11:44 UTC (rev 15383)
@@ -106,8 +106,12 @@
 	}
 
 	private static void mapJmsEprProperties(Element toElement, JmsProvider provider, JmsMessageFilter messageFilter) {
-		toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, messageFilter.getDestType().toString().toLowerCase());
-		toElement.setAttribute(JMSEpr.DESTINATION_NAME_TAG, messageFilter.getDestName());
+		if(messageFilter.getDestType() == JmsMessageFilter.DestType.QUEUE) {
+            toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, JMSEpr.QUEUE_TYPE);
+        } else {
+            toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, JMSEpr.TOPIC_TYPE);
+        }
+        toElement.setAttribute(JMSEpr.DESTINATION_NAME_TAG, messageFilter.getDestName());
 		toElement.setAttribute(JMSEpr.MESSAGE_SELECTOR_TAG, messageFilter.getSelector());
 		toElement.setAttribute(JMSEpr.CONNECTION_FACTORY_TAG, provider.getConnectionFactory());
 		toElement.setAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG, provider.getJndiContextFactory());

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java	2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java	2007-09-26 17:11:44 UTC (rev 15383)
@@ -23,10 +23,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import javax.management.MBeanServer;
 
@@ -465,6 +462,11 @@
             String propertyNames[] = properties.getNames();
             Map responseMap = invocationRequest.getReturnPayload();
 
+            if(responseMap == null) {
+                responseMap = new LinkedHashMap();
+                invocationRequest.setReturnPayload(responseMap);
+            }
+            
             for(String name : propertyNames) {
                 Object value = properties.getProperty(name);
                 if(value != null) {

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2007-09-26 16:06:46 UTC (rev 15382)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2007-09-26 17:11:44 UTC (rev 15383)
@@ -31,10 +31,7 @@
 import java.util.Properties;
 import java.util.Set;
 
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
+import javax.jms.*;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -222,21 +219,29 @@
      * @throws ManagedLifecycleException for errors while destroying.
      */
     protected void doThreadedDestroy() throws ManagedLifecycleException {
-        if (_serviceName != null) {
-            RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
-        }
+        cleanup();
+    }
 
-        if (_messageReceiver != null) {
+    private void cleanup() {
+        try {
+            if (_serviceName != null) {
+                RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
+            }
+        } finally {
             try {
-                _messageReceiver.close();
+                if (jmsMessageConsumer != null) {
+                    try {
+                        jmsMessageConsumer.close();
+                    }
+                    catch (final JMSException jmse) {
+                    } // ignore
+                }
+            } finally {
+                if (jmsSession != null) {
+                    jmsConnectionPool.closeSession(jmsSession);
+                }
             }
-            catch (final JMSException jmse) {
-            } // ignore
         }
-
-        if (_queueSession != null) {
-            _pool.closeSession(_queueSession);
-        }
     }
 
     /**
@@ -259,10 +264,10 @@
         if (_targetServiceName == null)
         	throw new ConfigurationException("No service name defined!");
         
-        _queueName = ListenerUtil.getValue(_config,
+        jmsDestinationName = ListenerUtil.getValue(_config,
                 JMSEpr.DESTINATION_NAME_TAG, null);
 
-        if (_queueName == null)
+        if (jmsDestinationName == null)
         	throw new ConfigurationException("No queue name defined!");
         
         resolveComposerClass();
@@ -310,8 +315,8 @@
 
     private void prepareMessageReceiver() throws ConfigurationException,
             JMSException, ConnectionException {
-        _queueSession = null;
-        _queue = null;
+        jmsSession = null;
+        jmsDestination = null;
 
         Properties environment = new Properties();
 
@@ -349,35 +354,48 @@
         _serviceCategory = _config
                 .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
         _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-        _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
-                _queueName, sFactClass, environment, _messageSelector);
 
-        _pool = JmsConnectionPoolContainer.getPool(environment, sFactClass,
-                JMSEpr.QUEUE_TYPE);
+        String destType = _config.getAttribute(JMSEpr.DESTINATION_TYPE_TAG);
+        _myEpr = (null == _serviceName) ? null : new JMSEpr(destType,
+                jmsDestinationName, sFactClass, environment, _messageSelector);
 
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment, sFactClass, destType);
+
         try {
-            _queueSession = _pool.getQueueSession();
+            jmsSession = jmsConnectionPool.getSession();
         }
         catch (NamingException ne) {
-            throw new ConfigurationException(
-                    "Failed to obtain queue session from pool", ne);
+            throw new ConfigurationException("Failed to obtain queue session from pool", ne);
         }
 
         try {
-            _queue = (Queue) oJndiCtx.lookup(_queueName);
+            jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
         }
         catch (NamingException nex) {
             try {
                 oJndiCtx = NamingContext.getServerContext(environment);
-                _queue = (Queue) oJndiCtx.lookup(_queueName);
+                jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
             }
             catch (NamingException ne) {
-                _queue = _queueSession.createQueue(_queueName);
+                if(jmsSession instanceof QueueSession) {
+                    jmsDestination = jmsSession.createQueue(jmsDestinationName);
+                } else {
+                    jmsDestination = jmsSession.createTopic(jmsDestinationName);
+                }
             }
         }
 
-        _messageReceiver = _queueSession.createReceiver(_queue,
-                _messageSelector);
+        if(jmsSession instanceof QueueSession && jmsDestination instanceof Queue) {
+            jmsMessageConsumer = ((QueueSession)jmsSession).createReceiver((Queue)jmsDestination, _messageSelector);
+        } else if(jmsSession instanceof TopicSession && jmsDestination instanceof Topic) {
+            jmsMessageConsumer = ((TopicSession)jmsSession).createSubscriber((Topic)jmsDestination, _messageSelector, false);
+        } else {
+            try {
+                throw new ConfigurationException("The JMS destination identified by name '" + jmsDestinationName + "' must match it's configured destination type '" + jmsDestinationName + "'.");
+            } finally {
+                cleanup();
+            }
+        }
     } // ________________________________
 
     /**
@@ -388,7 +406,7 @@
     protected javax.jms.Message receiveOne() {
         while (isRunning())
             try {
-                javax.jms.Message ret = _messageReceiver.receive(200);
+                javax.jms.Message ret = jmsMessageConsumer.receive(200);
                 if (null != ret)
                     return ret;
             }
@@ -426,13 +444,13 @@
     protected final static Logger _logger = Logger
             .getLogger(JmsGatewayListener.class);
 
-    protected String _queueName;
+    protected String jmsDestinationName;
 
-    protected QueueSession _queueSession;
+    protected Session jmsSession;
 
-    protected Queue _queue;
+    protected Destination jmsDestination;
 
-    protected MessageConsumer _messageReceiver;
+    protected MessageConsumer jmsMessageConsumer;
 
     protected String _messageSelector;
 
@@ -456,7 +474,7 @@
 
     protected Courier _courier;
 
-    protected JmsConnectionPool _pool;
+    protected JmsConnectionPool jmsConnectionPool;
 
     /**
      * The minimum error delay.




More information about the jboss-svn-commits mailing list