[jboss-svn-commits] JBL Code SVN: r33576 - in labs/jbossesb/trunk/product: docs and 7 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jun 21 16:59:13 EDT 2010
Author: tfennelly
Date: 2010-06-21 16:59:13 -0400 (Mon, 21 Jun 2010)
New Revision: 33576
Modified:
labs/jbossesb/trunk/product/.classpath
labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
https://jira.jboss.org/browse/JBESB-1890
Unable to configure jms-message-filter as durable subscriber to a Topic
Modified: labs/jbossesb/trunk/product/.classpath
===================================================================
--- labs/jbossesb/trunk/product/.classpath 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/.classpath 2010-06-21 20:59:13 UTC (rev 33576)
@@ -1,6 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="**/.svn/" kind="src" path="rosetta/src"/>
+ <classpathentry kind="src" path="samples/quickstarts/jms_topic/src"/>
+ <classpathentry kind="src" path="samples/quickstarts/helloworld/src"/>
<classpathentry excluding="**/.svn/" kind="src" path="rosetta/tests/resources"/>
<classpathentry excluding="**/.svn/" kind="src" path="rosetta/tests/src"/>
<classpathentry excluding="**/.svn/" kind="src" path="services/jbossesb/src/main/java"/>
Modified: labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
===================================================================
(Binary files differ)
Modified: labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd
===================================================================
--- labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd 2010-06-21 20:59:13 UTC (rev 33576)
@@ -1009,6 +1009,24 @@
<xsd:element maxOccurs="1" minOccurs="0"
ref="jesb:jms-message-filter" />
</xsd:sequence>
+ <xsd:attribute name="clientId" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation xml:lang="en">
+ Client ID to be associated with the connection. Used to associate a connection and its objects with
+ state maintained on behalf of the client by a provider e.g. durable subscriptions.
+ <p/>
+ If a clientId is required (e.g. when a 'durableSubscriptionName' is specified), but is not specified,
+ it will default to the listener name.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="durableSubscriptionName" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation xml:lang="en">
+ Durable subscription name.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2010-06-21 20:59:13 UTC (rev 33576)
@@ -33,6 +33,7 @@
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.xml.parsers.ParserConfigurationException;
@@ -609,7 +610,14 @@
if (destination == null) {
throw new CourierException("Could not locate destination: " + destinationName);
}
- _messageConsumer = session.createConsumer(destination, _epr.getMessageSelector());
+
+ String durableSubscriptionName = _epr.getDurableSubscriptionName();
+ if(durableSubscriptionName != null && destination instanceof Topic) {
+ _messageConsumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, _epr.getMessageSelector(), false);
+ } else {
+ _messageConsumer = session.createConsumer(destination, _epr.getMessageSelector());
+ }
+
success = true;
} finally {
NamingContextPool.releaseNamingContext(oJndiCtx) ;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2010-06-21 20:59:13 UTC (rev 33576)
@@ -82,6 +82,11 @@
private int maxSessions = DEFAULT_POOL_SIZE; //TODO Make this manageable
/**
+ * Connection ClientId.
+ */
+ private String clientId;
+
+ /**
* The max number of sessions per connection. Defaults to "maxSessions".
*/
private int maxSessionsPerConnection;
@@ -140,6 +145,8 @@
} else if(maxXASessionsPerConnection > maxSessionsPerConnection) {
throw new ConnectionException("Invalid '" + JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION + "' configuration value '" + maxXASessionsPerConnection + "'. Cannot be greater than the configured value for '" + JMSEpr.MAX_SESSIONS_PER_CONNECTION + "', which is " + maxSessionsPerConnection + ".");
}
+
+ clientId = poolKey.get(JMSEpr.CLIENT_ID);
}
private int getIntPoolConfig(Map<String, String> poolKey, String configKey, int defaultVal) throws ConnectionException {
@@ -763,6 +770,10 @@
{
throw new ConnectionException("Unknown factory connection type: " + factoryConnection.getClass().getCanonicalName());
}
+
+ if(clientId != null) {
+ jmsConnection.setClientID(clientId);
+ }
jmsConnection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException arg0)
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java 2010-06-21 20:59:13 UTC (rev 33576)
@@ -101,6 +101,11 @@
public static final String JNDI_PREFIXES = "jndi-prefixes";
+ public static final String CLIENT_ID = "client-id";
+
+ public static final String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+
+
/**
* JMS Client acknowledgment mode configuration tag
*/
@@ -636,12 +641,21 @@
{
return getAddr().getExtensionValue(CONNECTION_FACTORY_TAG);
}
+
+ public final String getClientId()
+ {
+ return getAddr().getExtensionValue(CLIENT_ID);
+ }
+
+ public final String getDurableSubscriptionName()
+ {
+ return getAddr().getExtensionValue(DURABLE_SUBSCRIPTION_NAME);
+ }
/**
* @return the jndi context factory for this EPR, or <code>null</code> if
* none is set.
*/
-
public final Properties getJndiEnvironment()
{
final Properties properties = new Properties();
@@ -659,6 +673,8 @@
if(tag.equals(JMSEpr.MAX_SESSIONS_PER_CONNECTION) || tag.equals(JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION)) {
properties.put(tag, extension.getValue());
+ } else if(tag.equals(JMSEpr.CLIENT_ID)) {
+ properties.put(tag, extension.getValue());
} else {
for(String jndiPrefix: jndiPrefixes)
{
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2010-06-21 20:59:13 UTC (rev 33576)
@@ -169,6 +169,9 @@
tree.mapTo(environment, JMSEpr.MAX_SESSIONS_PER_CONNECTION);
tree.mapTo(environment, JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION);
+
+ tree.mapTo(environment, JMSEpr.CLIENT_ID);
+ tree.mapTo(environment, JMSEpr.DURABLE_SUBSCRIPTION_NAME);
String jmsFactoryClass = getAttrAndWarn(tree,
JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/JmsListenerMapper.java 2010-06-21 20:59:13 UTC (rev 33576)
@@ -120,7 +120,7 @@
// Map EPR related attributes onto the listener - from the bus and provider and listener.
// Note: This will change - the Gateways will also support the EPR element...
- mapJmsEprProperties(listenerNode, provider, messageFilter);
+ mapJmsEprProperties(listenerNode, provider, bus, listener, messageFilter);
listenerNode.setAttribute(ListenerTagNames.PROTOCOL_TAG, JMSEpr.JMS_PROTOCOL);
MapperUtil.mapEPRProperties(listener, listenerNode, model);
@@ -129,7 +129,7 @@
// Map EPR related attributes onto the EPR - from the bus and provider and listener...
- mapJmsEprProperties(eprNode, provider, messageFilter);
+ mapJmsEprProperties(eprNode, provider, bus, listener, messageFilter);
eprNode.setAttribute(ListenerTagNames.PROTOCOL_TAG, JMSEpr.JMS_PROTOCOL);
MapperUtil.mapEPRProperties(listener, eprNode, model);
@@ -143,7 +143,7 @@
return listenerNode;
}
- private static void mapJmsEprProperties(Element toElement, JmsProviderType provider, JmsMessageFilter messageFilter) {
+ private static void mapJmsEprProperties(Element toElement, JmsProviderType provider, JmsBus bus, JmsListener listener, JmsMessageFilter messageFilter) throws ConfigurationException {
if(messageFilter.getDestType() == DestType.QUEUE) {
toElement.setAttribute(JMSEpr.DESTINATION_TYPE_TAG, JMSEpr.QUEUE_TYPE);
} else {
@@ -160,6 +160,22 @@
toElement.setAttribute(JMSEpr.JMS_SECURITY_PRINCIPAL_TAG, messageFilter.getJmsSecurityPrincipal());
toElement.setAttribute(JMSEpr.JMS_SECURITY_CREDENTIAL_TAG, messageFilter.getJmsSecurityCredential());
toElement.setAttribute(JMSEpr.TRANSACTED_TAG, Boolean.toString( messageFilter.getTransacted()));
+
+ String durableSubsName = listener.getDurableSubscriptionName();
+ if(durableSubsName != null) {
+ if(messageFilter.getDestType() != DestType.TOPIC) {
+ throw new ConfigurationException("JMS listener configuration on JMS Bus '" + bus.getBusid() + "' defines a durable subscription name. Durable subscribers are only supported for JMS Topics.");
+ }
+
+ toElement.setAttribute(JMSEpr.DURABLE_SUBSCRIPTION_NAME, durableSubsName);
+
+ if(listener.getClientId() != null) {
+ toElement.setAttribute(JMSEpr.CLIENT_ID, listener.getClientId());
+ } else {
+ // If not defined on the listener, default the clientId to the configured listener name...
+ toElement.setAttribute(JMSEpr.CLIENT_ID, listener.getName());
+ }
+ }
}
private static void mapJmsJcaAttributes(final JmsListener listener,
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2010-06-21 16:31:12 UTC (rev 33575)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2010-06-21 20:59:13 UTC (rev 33576)
@@ -36,6 +36,7 @@
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -70,7 +71,9 @@
public class JmsGatewayListener extends AbstractThreadedManagedLifecycle {
- public JmsGatewayListener(ConfigTree listenerConfig)
+ private String durableSubscriptionName;
+
+ public JmsGatewayListener(ConfigTree listenerConfig)
throws ConfigurationException {
super(listenerConfig);
_config = listenerConfig;
@@ -104,6 +107,8 @@
} catch (MessageDeliverException e) {
throw new ManagedLifecycleException(e);
}
+
+ durableSubscriptionName = _config.getAttribute(JMSEpr.DURABLE_SUBSCRIPTION_NAME);
try {
prepareMessageReceiver();
@@ -351,6 +356,7 @@
_config.mapTo(environment, JMSEpr.JNDI_PKG_PREFIX_TAG, Context.URL_PKG_PREFIXES);
_config.mapTo(environment, JMSEpr.MAX_SESSIONS_PER_CONNECTION);
_config.mapTo(environment, JMSEpr.MAX_XA_SESSIONS_PER_CONNECTION);
+ _config.mapTo(environment, JMSEpr.CLIENT_ID);
Set<String> names = _config.getAttributeNames();
final String jndiPrefixesValue = _config.getAttribute(JMSEpr.JNDI_PREFIXES) ;
@@ -434,7 +440,11 @@
NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
- jmsMessageConsumer = jmsSession.createConsumer(jmsDestination, _messageSelector);
+ if(durableSubscriptionName != null && jmsDestination instanceof Topic) {
+ jmsMessageConsumer = jmsSession.createDurableSubscriber((Topic)jmsDestination, durableSubscriptionName, _messageSelector, false);
+ } else {
+ jmsMessageConsumer = jmsSession.createConsumer(jmsDestination, _messageSelector);
+ }
} // ________________________________
/**
More information about the jboss-svn-commits
mailing list