[jboss-svn-commits] JBL Code SVN: r23981 - in labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta: tests/src/org/jboss/soa/esb/actions/routing and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 20 06:20:41 EST 2008
Author: kevin.conner at jboss.com
Date: 2008-11-20 06:20:41 -0500 (Thu, 20 Nov 2008)
New Revision: 23981
Modified:
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
Log:
Handle session/providers: JBESB-2190
Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-11-20 06:17:58 UTC (rev 23980)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-11-20 11:20:41 UTC (rev 23981)
@@ -45,6 +45,7 @@
import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -137,10 +138,6 @@
*/
private String queueName;
/**
- * JMS Queue setup.
- */
- private JMSSendQueueSetup queueSetup;
- /**
* Strategy for setting JMSProperties
*/
private final JMSPropertiesSetter jmsPropertiesStrategy ;
@@ -164,6 +161,25 @@
private String connectionFactory;
/**
+ * The pool to use for the jms routing.
+ */
+ private JmsConnectionPool pool ;
+ /**
+ * The jms target destination for routing.
+ */
+ private Destination jmsDestination ;
+ /**
+ * Thread local used for passing JmsSession between methods.
+ * This is to allow modifications without changing the API.
+ */
+ private ThreadLocal<JmsSession> SESSION = new ThreadLocal<JmsSession>() ;
+ /**
+ * The JMS reply to destination.
+ */
+ private String jmsReplyToName ;
+
+
+ /**
* Sole public constructor.
*
* @param propertiesTree Action properties.
@@ -192,10 +208,10 @@
if ( ttlStr != null )
timeToLive = Long.parseLong( ttlStr );
- jndiContextFactory = properties.getAttribute( JMSEpr.JNDI_CONTEXT_FACTORY_TAG );
- jndiUrl = properties.getAttribute( JMSEpr.JNDI_URL_TAG );
- jndiPkgPrefix = properties.getAttribute( JMSEpr.JNDI_PKG_PREFIX_TAG );
- connectionFactory = properties.getAttribute( JMSEpr.CONNECTION_FACTORY_TAG );
+ jndiContextFactory = properties.getAttribute( JMSEpr.JNDI_CONTEXT_FACTORY_TAG, Configuration.getJndiServerContextFactory());
+ jndiUrl = properties.getAttribute( JMSEpr.JNDI_URL_TAG, Configuration.getJndiServerURL());
+ jndiPkgPrefix = properties.getAttribute( JMSEpr.JNDI_PKG_PREFIX_TAG, Configuration.getJndiServerPkgPrefix());
+ connectionFactory = properties.getAttribute( JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
final String propertyStrategy = properties.getAttribute(PROPERTY_STRATEGY) ;
if (propertyStrategy == null) {
@@ -215,8 +231,17 @@
throw new ConfigurationException("'" + SECURITY_PRINCIPAL + "' must be accompanied by a '" + SECURITY_CREDITIAL + "'");
else if ( securityCredential != null && securityPrincipal == null )
throw new ConfigurationException("'" + SECURITY_CREDITIAL + "' must be accompanied by a '" + SECURITY_PRINCIPAL + "'");
-
- createQueueSetup( queueName, jndiContextFactory, jndiUrl, jndiPkgPrefix, connectionFactory, securityPrincipal, securityCredential );
+
+ final Properties environment = getEnvironment() ;
+ try {
+ pool = ( securityPrincipal != null ) ?
+ JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", securityPrincipal, securityCredential) :
+ JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory" );
+ } catch (final ConnectionException ce) {
+ throw new ConfigurationException("Unexpected error obtaining JMS connection pool") ;
+ }
+
+ createQueueSetup(queueName, jndiContextFactory, jndiUrl, jndiPkgPrefix, connectionFactory, securityPrincipal, securityCredential);
}
/**
@@ -235,41 +260,46 @@
* @see org.jboss.soa.esb.actions.routing.AbstractRouter#route(java.lang.Object)
*/
public void route(Object message) throws ActionProcessingException {
-
- if(!(message instanceof org.jboss.soa.esb.message.Message)) {
- throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
+ final JmsSession jmsSession ;
+ try {
+ jmsSession = pool.getSession() ;
+ } catch (final ConnectionException ce) {
+ throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
+ } catch (NamingException ne) {
+ throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
+ } catch (JMSException jmse) {
+ throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
}
-
- final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
-
+ SESSION.set(jmsSession) ;
try {
- Message jmsMessage = null;
-
- if ( unwrap ) {
- Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
- jmsMessage = createJMSMessageWithObjectType( objectFromBody );
+ if(!(message instanceof org.jboss.soa.esb.message.Message)) {
+ throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
}
- else {
- jmsMessage = createObjectMessage(Util.serialize(esbMessage));
- }
+
+ final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
- setStringProperties(jmsMessage);
- setJMSProperties( esbMessage, jmsMessage );
- setJMSReplyTo( jmsMessage, esbMessage );
- send( jmsMessage );
-
- } catch(Exception e) {
- StringBuilder sb = new StringBuilder();
- sb.append("Exception while sending message [").append(message).append("] to destination [");
-
- if (queueSetup != null)
- sb.append(queueSetup.destinationName).append("].");
- else
- sb.append("null ].");
-
- String errorMessage = sb.toString();
- logger.error(errorMessage, e);
- throw new ActionProcessingException(errorMessage, e);
+ try {
+ Message jmsMessage = null;
+
+ if ( unwrap ) {
+ Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
+ jmsMessage = createJMSMessageWithObjectType( objectFromBody );
+ }
+ else {
+ jmsMessage = createObjectMessage(Util.serialize(esbMessage));
+ }
+
+ setStringProperties(jmsMessage);
+ setJMSProperties( esbMessage, jmsMessage );
+ send( jmsMessage );
+ } catch(Exception e) {
+ final String errorMessage = "Exception while sending message [" + message + "] to destination [" + queueName + "]." ;
+ logger.error(errorMessage);
+ throw new ActionProcessingException(errorMessage, e);
+ }
+ } finally {
+ SESSION.set(null) ;
+ pool.closeSession(jmsSession) ;
}
}
@@ -277,18 +307,18 @@
{
Message jmsMessage = null;
if(objectFromBody instanceof String) {
- jmsMessage = queueSetup.jmsSession.createTextMessage();
+ jmsMessage = SESSION.get().createTextMessage();
if(logger.isDebugEnabled()) {
- logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueSetup.destinationName + "].");
+ logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueName + "].");
}
((TextMessage)jmsMessage).setText((String)objectFromBody);
} else if(objectFromBody instanceof byte[]) {
- jmsMessage = queueSetup.jmsSession.createBytesMessage();
+ jmsMessage = SESSION.get().createBytesMessage();
if(logger.isDebugEnabled()) {
- logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueSetup.destinationName + "].");
+ logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueName + "].");
}
((BytesMessage)jmsMessage).writeBytes((byte[])objectFromBody);
@@ -301,7 +331,22 @@
protected void send( Message jmsMessage ) throws JMSException
{
- queueSetup.jmsProducer.send(jmsMessage);
+ final MessageProducer jmsProducer = SESSION.get().createProducer(jmsDestination) ;
+ try {
+ jmsProducer.setPriority(priority) ;
+ jmsProducer.setDeliveryMode(deliveryMode) ;
+ jmsProducer.setTimeToLive(timeToLive) ;
+
+ // The following seems to be broken but is copied for now. I am not even sure it is ever used.
+ if (jmsReplyToName != null) {
+ final Destination jmsReplyToDestination = SESSION.get().createQueue(jmsReplyToName);
+ jmsMessage.setJMSReplyTo(jmsReplyToDestination);
+ }
+
+ jmsProducer.send(jmsMessage);
+ } finally {
+ jmsProducer.close() ;
+ }
}
/**
@@ -318,10 +363,10 @@
protected Message createObjectMessage(Object message) throws JMSException {
Message jmsMessage;
- jmsMessage = queueSetup.jmsSession.createObjectMessage();
+ jmsMessage = SESSION.get().createObjectMessage();
if(logger.isDebugEnabled()) {
- logger.debug("Sending Object message: [" + message + "] to destination [" + queueSetup.destinationName + "].");
+ logger.debug("Sending Object message: [" + message + "] to destination [" + queueName + "].");
}
((ObjectMessage)jmsMessage).setObject((Serializable) message);
return jmsMessage;
@@ -353,17 +398,6 @@
return null;
}
- @Override
- protected void finalize() throws Throwable {
- queueSetup.close();
- super.finalize();
- }
-
- protected void createQueueSetup( String queueName ) throws ConfigurationException
- {
- createQueueSetup( queueName, null, null, null, null, null, null );
- }
-
protected void createQueueSetup( String queueName,
String jndiContextFactory,
String jndiUrl,
@@ -372,124 +406,65 @@
String securityPrincipal,
String securityCredential) throws ConfigurationException
{
- try {
- queueSetup = new JMSSendQueueSetup(queueName, jndiContextFactory, jndiUrl, jndiPkgPrefix, connectionFactory, securityPrincipal, securityCredential );
- queueSetup.setDeliveryMode( deliveryMode );
- queueSetup.setPriority( priority );
- queueSetup.setTimeToLive( timeToLive );
- if ( logger.isDebugEnabled() )
- {
- logger.debug( "JMSRouter DeliveryMode : " + deliveryMode);
- logger.debug( "JMSRouter Priority : " + priority);
- logger.debug( "JMSRouter TimeToLive : " + timeToLive);
- }
- } catch (Throwable t) {
- throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
- }
- }
-
- private static class JMSSendQueueSetup {
- JmsSession jmsSession;
- Destination jmsDestination;
- MessageProducer jmsProducer;
- String destinationName;
- JmsConnectionPool pool;
-
- private JMSSendQueueSetup(String destinationName) throws NamingException, JMSException, ConnectionException, NamingContextException {
- this( destinationName, null, null, null, null, null, null );
- }
-
- private JMSSendQueueSetup(String destinationName,
- String jndiContextFactory,
- String jndiUrl,
- String jndiPkgPrefix,
- String connectionFactoryName,
- String securityPrincipal,
- String securityCredential) throws NamingException, JMSException, ConnectionException, NamingContextException {
-
- if ( jndiContextFactory == null )
- jndiContextFactory = Configuration.getJndiServerContextFactory();
- if ( jndiUrl == null )
- jndiUrl = Configuration.getJndiServerURL();
- if ( jndiPkgPrefix == null )
- jndiPkgPrefix = Configuration.getJndiServerPkgPrefix();
- if ( connectionFactoryName == null )
- connectionFactoryName = "ConnectionFactory";
-
- Properties environment = new Properties();
- environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiContextFactory);
- environment.setProperty(Context.PROVIDER_URL, jndiUrl);
- environment.setProperty(Context.URL_PKG_PREFIXES, jndiPkgPrefix);
- Context oCtx = NamingContextPool.getNamingContext(environment);
+ final Properties environment = getEnvironment() ;
+ try
+ {
+ final JmsSession jmsSession = pool.getSession();
try {
- pool = ( securityPrincipal != null ) ?
- JmsConnectionPoolContainer.getPool(environment, connectionFactoryName, securityPrincipal, securityCredential) :
- JmsConnectionPoolContainer.getPool(environment, connectionFactoryName );
-
- this.destinationName = destinationName;
-
- jmsSession = pool.getSession();
- boolean clean = true ;
+ Context oCtx = NamingContextPool.getNamingContext(environment);
try {
try {
- jmsDestination = (Destination) oCtx.lookup(destinationName);
+ jmsDestination = (Queue) oCtx.lookup(queueName);
} catch (NamingException ne) {
try {
oCtx = NamingContextPool.replaceNamingContext(oCtx, environment);
- jmsDestination = (Queue) oCtx.lookup(destinationName);
+ jmsDestination = (Queue) oCtx.lookup(queueName);
} catch (NamingException nex) {
//ActiveMQ
- jmsDestination = jmsSession.createQueue(destinationName);
+ jmsDestination = jmsSession.createQueue(queueName);
}
}
- jmsProducer = jmsSession.createProducer(jmsDestination);
- clean = false ;
+ final MessageProducer jmsProducer = jmsSession.createProducer(jmsDestination);
+ jmsProducer.close() ;
} finally {
- if (clean) {
- pool.closeSession(jmsSession) ;
- }
+ NamingContextPool.releaseNamingContext(oCtx) ;
}
} finally {
- NamingContextPool.releaseNamingContext(oCtx) ;
+ pool.closeSession(jmsSession) ;
}
- }
-
- public void setDeliveryMode(final int deliveryMode ) throws JMSException
- {
- if ( jmsProducer != null )
- jmsProducer.setDeliveryMode( deliveryMode );
- }
-
- public void setPriority(final int priority ) throws JMSException
- {
- if ( jmsProducer != null )
- jmsProducer.setPriority( priority );
- }
-
- public void setTimeToLive(final long ttl ) throws JMSException
- {
- if ( jmsProducer != null )
- jmsProducer.setTimeToLive( ttl );
- }
-
- private void close() {
- try {
- pool.closeSession(jmsSession);
- if (jmsProducer!=null) {
- jmsProducer.close();
- }
- } catch (Exception e) {
- logger.error("Unable to close JMS Queue Setup.", e);
- }
- }
+ }
+ catch (Throwable t)
+ {
+ throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
+ }
}
+
+ private Properties getEnvironment()
+ {
+ final Properties environment = new Properties();
+ environment.setProperty(Context.PROVIDER_URL, jndiUrl);
+ environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiContextFactory);
+ environment.setProperty(Context.URL_PKG_PREFIXES, jndiPkgPrefix);
+ return environment ;
+ }
+
+ protected void createQueueSetup( String queueName ) throws ConfigurationException
+ {
+ createQueueSetup(queueName, null, null, null, null, null, null);
+ }
+
protected void setJMSReplyTo( final Message jmsMessage, final org.jboss.soa.esb.message.Message esbMessage ) throws URISyntaxException, JMSException, NamingException, ConnectionException, NamingContextException
{
- Destination destination = (Destination) esbMessage.getProperties().getProperty( JMSPropertiesSetter.JMS_REPLY_TO );
- if ( destination != null )
+ EPR replyToEpr = esbMessage.getHeader().getCall().getReplyTo();
+ if( !( replyToEpr instanceof JMSEpr) )
+ return;
+
+ JMSEpr jmsEpr = (JMSEpr) replyToEpr;
+ String destinationType = jmsEpr.getDestinationType();
+ if ( destinationType.equals( JMSEpr.QUEUE_TYPE ))
{
- jmsMessage.setJMSReplyTo( (Destination) destination );
+ jmsReplyToName = jmsEpr.getDestinationName() ;
}
}
Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java 2008-11-20 06:17:58 UTC (rev 23980)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java 2008-11-20 11:20:41 UTC (rev 23981)
@@ -166,9 +166,6 @@
// outbound is to simulate a new jms message that is about to leave the ESB.
TextMessageImpl outBoundJmsMessage = new TextMessageImpl();
router.setJMSReplyTo( outBoundJmsMessage, msg );
-
- Destination replyTo = outBoundJmsMessage.getJMSReplyTo();
- assertTrue( replyTo instanceof Queue );
}
finally
{
More information about the jboss-svn-commits
mailing list