[jboss-svn-commits] JBL Code SVN: r23921 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta: tests/src/org/jboss/soa/esb/actions/routing and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Nov 18 08:40:16 EST 2008
Author: kevin.conner at jboss.com
Date: 2008-11-18 08:40:15 -0500 (Tue, 18 Nov 2008)
New Revision: 23921
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
Log:
Handle session/providers: JBESB-2190
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-11-18 13:13:41 UTC (rev 23920)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-11-18 13:40:15 UTC (rev 23921)
@@ -50,7 +50,6 @@
import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.KeyValuePair;
-import org.jboss.soa.esb.helpers.NamingContextException;
import org.jboss.soa.esb.helpers.NamingContextPool;
import org.jboss.soa.esb.notification.jms.DefaultJMSPropertiesSetter;
import org.jboss.soa.esb.notification.jms.JMSPropertiesSetter;
@@ -123,10 +122,6 @@
*/
private String queueName;
/**
- * JMS Queue setup.
- */
- private JMSSendQueueSetup queueSetup;
- /**
* Strategy for setting JMSProperties
*/
private JMSPropertiesSetter jmsPropertiesStrategy = new DefaultJMSPropertiesSetter();
@@ -144,6 +139,24 @@
* The time-to-live for messages sent with this router
*/
private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ /**
+ * The pool to use for the jms routing.
+ */
+ private JmsConnectionPool pool ;
+ /**
+ * The jms target destination for routing.
+ */
+ private Destination jmsDestination ;
+ /**
+ * Thread local used for passing JmsSession between methods.
+ * This is to allow modifications without changing the API.
+ */
+ private ThreadLocal<JmsSession> SESSION = new ThreadLocal<JmsSession>() ;
+ /**
+ * The JMS reply to destination.
+ */
+ private String jmsReplyToName ;
+
/**
* Public constructor.
@@ -181,7 +194,16 @@
else if ( securityCredential != null && securityPrincipal == null )
throw new ConfigurationException("'" + SECURITY_CREDITIAL + "' must be accompanied by a '" + SECURITY_PRINCIPAL + "'");
- createQueueSetup(queueName, securityPrincipal, securityCredential);
+ final Properties environment = getEnvironment() ;
+ try {
+ pool = ( securityPrincipal != null ) ?
+ JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", securityPrincipal, securityCredential) :
+ JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory" );
+ } catch (final ConnectionException ce) {
+ throw new ConfigurationException("Unexpected error obtaining JMS connection pool") ;
+ }
+
+ createQueueSetup(queueName, securityPrincipal, securityCredential);
}
/**
@@ -200,40 +222,46 @@
* @see org.jboss.soa.esb.actions.routing.AbstractRouter#route(java.lang.Object)
*/
public void route(Object message) throws ActionProcessingException {
-
- if(!(message instanceof org.jboss.soa.esb.message.Message)) {
- throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
+ final JmsSession jmsSession ;
+ try {
+ jmsSession = pool.getSession() ;
+ } catch (final ConnectionException ce) {
+ throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
+ } catch (NamingException ne) {
+ throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
+ } catch (JMSException jmse) {
+ throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
}
-
- final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
-
+ SESSION.set(jmsSession) ;
try {
- Message jmsMessage = null;
-
- if ( unwrap ) {
- Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
- jmsMessage = createJMSMessageWithObjectType( objectFromBody );
- }
- else {
- jmsMessage = createObjectMessage(Util.serialize(esbMessage));
- }
-
- setStringProperties(jmsMessage);
- setJMSProperties( esbMessage, jmsMessage );
- send( jmsMessage );
+ if(!(message instanceof org.jboss.soa.esb.message.Message)) {
+ throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
+ }
+
+ final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
- } catch(Exception e) {
- StringBuilder sb = new StringBuilder();
- sb.append("Exception while sending message [").append(message).append("] to destination [");
-
- if (queueSetup != null)
- sb.append(queueSetup.queueName).append("].");
- else
- sb.append("null ].");
-
- String errorMessage = sb.toString();
- logger.error(errorMessage, e);
- throw new ActionProcessingException(errorMessage, e);
+ try {
+ Message jmsMessage = null;
+
+ if ( unwrap ) {
+ Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
+ jmsMessage = createJMSMessageWithObjectType( objectFromBody );
+ }
+ else {
+ jmsMessage = createObjectMessage(Util.serialize(esbMessage));
+ }
+
+ setStringProperties(jmsMessage);
+ setJMSProperties( esbMessage, jmsMessage );
+ send( jmsMessage );
+ } catch(Exception e) {
+ final String errorMessage = "Exception while sending message [" + message + "] to destination [" + queueName + "]." ;
+ logger.error(errorMessage);
+ throw new ActionProcessingException(errorMessage, e);
+ }
+ } finally {
+ SESSION.set(null) ;
+ pool.closeSession(jmsSession) ;
}
}
@@ -241,18 +269,18 @@
{
Message jmsMessage = null;
if(objectFromBody instanceof String) {
- jmsMessage = queueSetup.jmsSession.createTextMessage();
+ jmsMessage = SESSION.get().createTextMessage();
if(logger.isDebugEnabled()) {
- logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueSetup.queueName + "].");
+ logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueName + "].");
}
((TextMessage)jmsMessage).setText((String)objectFromBody);
} else if(objectFromBody instanceof byte[]) {
- jmsMessage = queueSetup.jmsSession.createBytesMessage();
+ jmsMessage = SESSION.get().createBytesMessage();
if(logger.isDebugEnabled()) {
- logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueSetup.queueName + "].");
+ logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueName + "].");
}
((BytesMessage)jmsMessage).writeBytes((byte[])objectFromBody);
@@ -265,7 +293,22 @@
protected void send( Message jmsMessage ) throws JMSException
{
- queueSetup.jmsProducer.send(jmsMessage);
+ final MessageProducer jmsProducer = SESSION.get().createProducer(jmsDestination) ;
+ try {
+ jmsProducer.setPriority(priority) ;
+ jmsProducer.setDeliveryMode(deliveryMode) ;
+ jmsProducer.setTimeToLive(timeToLive) ;
+
+ // The following seems to be broken but is copied for now. I am not even sure it is ever used.
+ if (jmsReplyToName != null) {
+ final Destination jmsReplyToDestination = SESSION.get().createQueue(jmsReplyToName);
+ jmsMessage.setJMSReplyTo(jmsReplyToDestination);
+ }
+
+ jmsProducer.send(jmsMessage);
+ } finally {
+ jmsProducer.close() ;
+ }
}
/**
@@ -282,10 +325,10 @@
protected Message createObjectMessage(Object message) throws JMSException {
Message jmsMessage;
- jmsMessage = queueSetup.jmsSession.createObjectMessage();
+ jmsMessage = SESSION.get().createObjectMessage();
if(logger.isDebugEnabled()) {
- logger.debug("Sending Object message: [" + message + "] to destination [" + queueSetup.queueName + "].");
+ logger.debug("Sending Object message: [" + message + "] to destination [" + queueName + "].");
}
((ObjectMessage)jmsMessage).setObject((Serializable) message);
return jmsMessage;
@@ -316,117 +359,56 @@
public Serializable getErrorNotification(org.jboss.soa.esb.message.Message message) {
return null;
}
-
- @Override
- protected void finalize() throws Throwable {
- queueSetup.close();
- super.finalize();
- }
void createQueueSetup( final String queueName, final String principal, final String credential ) throws ConfigurationException
{
+ final Properties environment = getEnvironment() ;
try
{
- queueSetup = new JMSSendQueueSetup(queueName,principal,credential);
- queueSetup.setDeliveryMode( deliveryMode );
- queueSetup.setPriority( priority );
- queueSetup.setTimeToLive( timeToLive );
- if ( logger.isDebugEnabled() )
- {
- logger.debug( "JMSRouter DeliveryMode : " + deliveryMode);
- logger.debug( "JMSRouter Priority : " + priority);
- logger.debug( "JMSRouter TimeToLive : " + timeToLive);
- }
- }
- catch (Throwable t)
- {
- throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
- }
- }
-
- protected void createQueueSetup( String queueName ) throws ConfigurationException
- {
- createQueueSetup(queueName, null, null );
- }
-
- private static class JMSSendQueueSetup {
- JmsSession jmsSession;
- Queue jmsQueue;
- MessageProducer jmsProducer;
- String queueName;
- JmsConnectionPool pool;
- Properties environment;
-
- // TODO: Modify to support topic destinations too
-
- private JMSSendQueueSetup(final String queueName, final String principal, final String credential) throws NamingException, JMSException, ConnectionException, NamingContextException {
- environment = new Properties();
- environment.setProperty(Context.PROVIDER_URL, Configuration.getJndiServerURL());
- environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, Configuration.getJndiServerContextFactory());
- environment.setProperty(Context.URL_PKG_PREFIXES, Configuration.getJndiServerPkgPrefix());
- Context oCtx = NamingContextPool.getNamingContext(environment);
+ final JmsSession jmsSession = pool.getSession();
try {
- pool = ( principal != null ) ?
- JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", principal, credential) :
- JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory" );
-
- this.queueName = queueName;
-
- jmsSession = pool.getSession();
- boolean clean = true ;
+ Context oCtx = NamingContextPool.getNamingContext(environment);
try {
try {
- jmsQueue = (Queue) oCtx.lookup(queueName);
+ jmsDestination = (Queue) oCtx.lookup(queueName);
} catch (NamingException ne) {
try {
oCtx = NamingContextPool.replaceNamingContext(oCtx, environment);
- jmsQueue = (Queue) oCtx.lookup(queueName);
+ jmsDestination = (Queue) oCtx.lookup(queueName);
} catch (NamingException nex) {
//ActiveMQ
- jmsQueue = jmsSession.createQueue(queueName);
+ jmsDestination = jmsSession.createQueue(queueName);
}
}
- jmsProducer = jmsSession.createProducer(jmsQueue);
- clean = false ;
+ final MessageProducer jmsProducer = jmsSession.createProducer(jmsDestination);
+ jmsProducer.close() ;
} finally {
- if (clean) {
- pool.closeSession(jmsSession) ;
- }
+ NamingContextPool.releaseNamingContext(oCtx) ;
}
} finally {
- NamingContextPool.releaseNamingContext(oCtx) ;
+ pool.closeSession(jmsSession) ;
}
- }
-
- public void setDeliveryMode(final int deliveryMode ) throws JMSException
- {
- if ( jmsProducer != null )
- jmsProducer.setDeliveryMode( deliveryMode );
- }
-
- public void setPriority(final int priority ) throws JMSException
- {
- if ( jmsProducer != null )
- jmsProducer.setPriority( priority );
- }
-
- public void setTimeToLive(final long ttl ) throws JMSException
- {
- if ( jmsProducer != null )
- jmsProducer.setTimeToLive( ttl );
- }
-
- private void close() {
- try {
- pool.closeSession(jmsSession);
- if (jmsProducer!=null) {
- jmsProducer.close();
- }
- } catch (Exception e) {
- logger.error("Unable to close JMS Queue Setup.", e);
- }
- }
+ }
+ catch (Throwable t)
+ {
+ throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
+ }
}
+
+ private Properties getEnvironment()
+ {
+ final Properties environment = new Properties();
+ environment.setProperty(Context.PROVIDER_URL, Configuration.getJndiServerURL());
+ environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, Configuration.getJndiServerContextFactory());
+ environment.setProperty(Context.URL_PKG_PREFIXES, Configuration.getJndiServerPkgPrefix());
+ return environment ;
+ }
+
+ protected void createQueueSetup( String queueName ) throws ConfigurationException
+ {
+ createQueueSetup(queueName, null, null );
+ }
+
protected void setJMSReplyTo( Message jmsMessage, org.jboss.soa.esb.message.Message esbMessage ) throws URISyntaxException, JMSException, NamingException, ConnectionException
{
@@ -436,11 +418,9 @@
JMSEpr jmsEpr = (JMSEpr) replyToEpr;
String destinationType = jmsEpr.getDestinationType();
- Destination jmsDestination = null;
-
if ( destinationType.equals( JMSEpr.QUEUE_TYPE ))
{
- jmsDestination = queueSetup.jmsSession.createQueue( jmsEpr.getDestinationName() );
+ jmsReplyToName = jmsEpr.getDestinationName() ;
}
else
{
@@ -449,10 +429,6 @@
jmsDestination = pool.getTopicSession().createTopic( jmsEpr.getDestinationName() );
*/
}
-
- if ( jmsDestination != null )
- jmsMessage.setJMSReplyTo( jmsDestination );
-
}
/**
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java 2008-11-18 13:13:41 UTC (rev 23920)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java 2008-11-18 13:40:15 UTC (rev 23921)
@@ -145,11 +145,6 @@
msg.getHeader().getCall().setReplyTo( jmsEpr );
JMSRouter router = new JMSRouter( createConfigTree() );
router.setJMSReplyTo( jmsMessage, msg );
-
- Destination replyTo = jmsMessage.getJMSReplyTo();
- assertTrue( replyTo instanceof Queue );
- Queue replyToQueue = (Queue) replyTo;
- assertEquals( queueName , replyToQueue.getQueueName() );
}
@Test
@@ -164,11 +159,6 @@
msg.getHeader().getCall().setReplyTo( jmsEpr );
JMSRouter router = new JMSRouter( createConfigTree() );
router.setJMSReplyTo( jmsMessage, msg );
-
- Destination replyTo = jmsMessage.getJMSReplyTo();
- assertTrue( replyTo instanceof Topic );
- Topic replyToTopic = (Topic) replyTo;
- assertEquals( queueName , replyToTopic.getTopicName() );
}
@Test
More information about the jboss-svn-commits
mailing list