[jboss-svn-commits] JBL Code SVN: r33576 - in labs/jbossesb/trunk/product: docs and 7 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jun 21 16:59:13 EDT 2010


Author: tfennelly
Date: 2010-06-21 16:59:13 -0400 (Mon, 21 Jun 2010)
New Revision: 33576

Modified:
   labs/jbossesb/trunk/product/.classpath
   labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
   labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
https://jira.jboss.org/browse/JBESB-1890
Unable to configure jms-message-filter as durable subscriber to a Topic


Modified: labs/jbossesb/trunk/product/.classpath
===================================================================
--- labs/jbossesb/trunk/product/.classpath	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/.classpath	2010-06-21 20:59:13 UTC (rev 33576)
@@ -1,6 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry excluding="**/.svn/" kind="src" path="rosetta/src"/>
+	<classpathentry kind="src" path="samples/quickstarts/jms_topic/src"/>
+	<classpathentry kind="src" path="samples/quickstarts/helloworld/src"/>
 	<classpathentry excluding="**/.svn/" kind="src" path="rosetta/tests/resources"/>
 	<classpathentry excluding="**/.svn/" kind="src" path="rosetta/tests/src"/>
 	<classpathentry excluding="**/.svn/" kind="src" path="services/jbossesb/src/main/java"/>

Modified: labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
===================================================================
(Binary files differ)

Modified: labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd
===================================================================
--- labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd	2010-06-21 20:59:13 UTC (rev 33576)
@@ -1009,6 +1009,24 @@
 						<xsd:element maxOccurs="1" minOccurs="0"
 							ref="jesb:jms-message-filter" />
 					</xsd:sequence>
+					<xsd:attribute name="clientId" type="xsd:string" use="optional">
+						<xsd:annotation>
+							<xsd:documentation xml:lang="en">
+								Client ID to be associated with the connection.  Used to associate a connection and its objects with
+								state maintained on behalf of the client by a provider e.g. durable subscriptions.
+								<p/>
+								If a clientId is required (e.g. when a 'durableSubscriptionName' is specified), but is not specified,
+								it will default to the listener name.
+							</xsd:documentation>
+						</xsd:annotation>
+					</xsd:attribute>			
+					<xsd:attribute name="durableSubscriptionName" type="xsd:string" use="optional">
+						<xsd:annotation>
+							<xsd:documentation xml:lang="en">
+								Durable subscription name.
+							</xsd:documentation>
+						</xsd:annotation>
+					</xsd:attribute>			
 				</xsd:extension>
 			</xsd:complexContent>
 		</xsd:complexType>

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2010-06-21 20:59:13 UTC (rev 33576)
@@ -33,6 +33,7 @@
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
+import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.NamingException;
 import javax.xml.parsers.ParserConfigurationException;
@@ -609,7 +610,14 @@
                         if (destination == null) {
                             throw new CourierException("Could not locate destination: " + destinationName);
                         }
-                        _messageConsumer = session.createConsumer(destination, _epr.getMessageSelector());
+                        
+                        String durableSubscriptionName = _epr.getDurableSubscriptionName();
+                        if(durableSubscriptionName != null && destination instanceof Topic) {
+                        	_messageConsumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, _epr.getMessageSelector(), false);
+                        } else {
+                            _messageConsumer = session.createConsumer(destination, _epr.getMessageSelector());
+                        }
+                        
                         success = true;
                     } finally {
                         NamingContextPool.releaseNamingContext(oJndiCtx) ;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2010-06-21 20:59:13 UTC (rev 33576)
@@ -82,6 +82,11 @@
     private int maxSessions = DEFAULT_POOL_SIZE;    //TODO Make this manageable
 
     /**
+     * Connection ClientId.
+     */
+    private String clientId;
+
+    /**
      * The max number of sessions per connection.  Defaults to "maxSessions".
      */
     private int maxSessionsPerConnection;
@@ -140,6 +145,8 @@
         } else if(maxXASessionsPerConnection > maxSessionsPerConnection) {
             throw new ConnectionException("Invalid '" + JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION + "' configuration value '" + maxXASessionsPerConnection + "'.  Cannot be greater than the configured value for '" + JMSEpr.MAX_SESSIONS_PER_CONNECTION + "', which is " + maxSessionsPerConnection + ".");
         }
+        
+        clientId = poolKey.get(JMSEpr.CLIENT_ID);
     }
 
     private int getIntPoolConfig(Map<String, String> poolKey, String configKey, int defaultVal) throws ConnectionException {
@@ -763,6 +770,10 @@
                     {
                         throw new ConnectionException("Unknown factory connection type: " + factoryConnection.getClass().getCanonicalName());
                     }
+                    
+                    if(clientId != null) {
+                    	jmsConnection.setClientID(clientId);
+                    }
         
                     jmsConnection.setExceptionListener(new ExceptionListener() {
                         public void onException(JMSException arg0)

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java	2010-06-21 20:59:13 UTC (rev 33576)
@@ -101,6 +101,11 @@
 	
 	public static final String JNDI_PREFIXES = "jndi-prefixes";
 	
+	public static final String CLIENT_ID = "client-id";
+	
+	public static final String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+	
+	
 	/**
 	 * JMS Client acknowledgment mode configuration tag
 	 */
@@ -636,12 +641,21 @@
 	{
 		return getAddr().getExtensionValue(CONNECTION_FACTORY_TAG);
 	}
+	
+	public final String getClientId()
+	{
+		return getAddr().getExtensionValue(CLIENT_ID);
+	}
+		
+	public final String getDurableSubscriptionName()
+	{
+		return getAddr().getExtensionValue(DURABLE_SUBSCRIPTION_NAME);
+	}
     
     /**
      * @return the jndi context factory for this EPR, or <code>null</code> if
      *         none is set.
      */
-
     public final Properties getJndiEnvironment()
     {
         final Properties properties = new Properties();
@@ -659,6 +673,8 @@
 
             if(tag.equals(JMSEpr.MAX_SESSIONS_PER_CONNECTION) || tag.equals(JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION)) {
                 properties.put(tag, extension.getValue());
+            } else if(tag.equals(JMSEpr.CLIENT_ID)) {
+                properties.put(tag, extension.getValue());
             } else {
                 for(String jndiPrefix: jndiPrefixes)
                 {

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2010-06-21 20:59:13 UTC (rev 33576)
@@ -169,6 +169,9 @@
 
             tree.mapTo(environment, JMSEpr.MAX_SESSIONS_PER_CONNECTION);
             tree.mapTo(environment, JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION);
+            
+            tree.mapTo(environment, JMSEpr.CLIENT_ID);
+            tree.mapTo(environment, JMSEpr.DURABLE_SUBSCRIPTION_NAME);
 
             String jmsFactoryClass = getAttrAndWarn(tree,
 					JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java	2010-06-21 20:59:13 UTC (rev 33576)
@@ -120,7 +120,7 @@
 
 			// Map EPR related attributes onto the listener - from the bus and provider and listener.
 			// Note: This will change - the Gateways will also support the EPR element...
-			mapJmsEprProperties(listenerNode, provider, messageFilter);
+			mapJmsEprProperties(listenerNode, provider, bus, listener, messageFilter);
                         listenerNode.setAttribute(ListenerTagNames.PROTOCOL_TAG, JMSEpr.JMS_PROTOCOL);
 
 			MapperUtil.mapEPRProperties(listener, listenerNode, model);
@@ -129,7 +129,7 @@
 
 
 			// Map EPR related attributes onto the EPR - from the bus and provider and listener...
-			mapJmsEprProperties(eprNode, provider, messageFilter);
+			mapJmsEprProperties(eprNode, provider, bus, listener, messageFilter);
 			eprNode.setAttribute(ListenerTagNames.PROTOCOL_TAG, JMSEpr.JMS_PROTOCOL);
 
 			MapperUtil.mapEPRProperties(listener, eprNode, model);
@@ -143,7 +143,7 @@
 		return listenerNode;
 	}
 
-    private static void mapJmsEprProperties(Element toElement, JmsProviderType provider, JmsMessageFilter messageFilter) {
+    private static void mapJmsEprProperties(Element toElement, JmsProviderType provider, JmsBus bus, JmsListener listener, JmsMessageFilter messageFilter) throws ConfigurationException {
 		if(messageFilter.getDestType() == DestType.QUEUE) {
             toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, JMSEpr.QUEUE_TYPE);
         } else {
@@ -160,6 +160,22 @@
 		toElement.setAttribute(JMSEpr.JMS_SECURITY_PRINCIPAL_TAG, messageFilter.getJmsSecurityPrincipal());
 		toElement.setAttribute(JMSEpr.JMS_SECURITY_CREDENTIAL_TAG, messageFilter.getJmsSecurityCredential());
 		toElement.setAttribute(JMSEpr.TRANSACTED_TAG, Boolean.toString( messageFilter.getTransacted()));
+		
+		String durableSubsName = listener.getDurableSubscriptionName();
+		if(durableSubsName != null) {
+			if(messageFilter.getDestType() != DestType.TOPIC) {
+				throw new ConfigurationException("JMS listener configuration on JMS Bus '" + bus.getBusid() + "' defines a durable subscription name.  Durable subscribers are only supported for JMS Topics.");				
+			}
+			
+			toElement.setAttribute(JMSEpr.DURABLE_SUBSCRIPTION_NAME, durableSubsName);
+			
+			if(listener.getClientId() != null) {
+				toElement.setAttribute(JMSEpr.CLIENT_ID, listener.getClientId());
+			} else {
+				// If not defined on the listener, default the clientId to the configured listener name...
+				toElement.setAttribute(JMSEpr.CLIENT_ID, listener.getName());
+			}
+		}		
 	}
 
     private static void mapJmsJcaAttributes(final JmsListener listener,

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2010-06-21 20:59:13 UTC (rev 33576)
@@ -36,6 +36,7 @@
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -70,7 +71,9 @@
 
 public class JmsGatewayListener extends AbstractThreadedManagedLifecycle {
 
-    public JmsGatewayListener(ConfigTree listenerConfig)
+    private String durableSubscriptionName;
+
+	public JmsGatewayListener(ConfigTree listenerConfig)
             throws ConfigurationException {
         super(listenerConfig);
         _config = listenerConfig;
@@ -104,6 +107,8 @@
         } catch (MessageDeliverException e) {
         	throw new ManagedLifecycleException(e);
         }
+
+        durableSubscriptionName = _config.getAttribute(JMSEpr.DURABLE_SUBSCRIPTION_NAME);
         
         try {
             prepareMessageReceiver();
@@ -351,6 +356,7 @@
         _config.mapTo(environment, JMSEpr.JNDI_PKG_PREFIX_TAG, Context.URL_PKG_PREFIXES);
         _config.mapTo(environment, JMSEpr.MAX_SESSIONS_PER_CONNECTION);
         _config.mapTo(environment, JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION);
+        _config.mapTo(environment, JMSEpr.CLIENT_ID);
 
         Set<String> names = _config.getAttributeNames();
         final String jndiPrefixesValue = _config.getAttribute(JMSEpr.JNDI_PREFIXES) ;
@@ -434,7 +440,11 @@
             NamingContextPool.releaseNamingContext(oJndiCtx) ;
         }
 
-        jmsMessageConsumer = jmsSession.createConsumer(jmsDestination, _messageSelector);
+        if(durableSubscriptionName != null && jmsDestination instanceof Topic) {
+        	jmsMessageConsumer = jmsSession.createDurableSubscriber((Topic)jmsDestination, durableSubscriptionName, _messageSelector, false);
+        } else {
+        	jmsMessageConsumer = jmsSession.createConsumer(jmsDestination, _messageSelector);
+        }
     } // ________________________________
 
     /**



More information about the jboss-svn-commits mailing list