[jboss-svn-commits] JBL Code SVN: r23981 - in labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta: tests/src/org/jboss/soa/esb/actions/routing and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 20 06:20:41 EST 2008


Author: kevin.conner at jboss.com
Date: 2008-11-20 06:20:41 -0500 (Thu, 20 Nov 2008)
New Revision: 23981

Modified:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
Log:
Handle session/providers: JBESB-2190

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-11-20 06:17:58 UTC (rev 23980)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-11-20 11:20:41 UTC (rev 23981)
@@ -45,6 +45,7 @@
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.common.Configuration;
 import org.jboss.soa.esb.helpers.ConfigTree;
@@ -137,10 +138,6 @@
      */
     private String queueName;
     /**
-     * JMS Queue setup.
-     */
-    private JMSSendQueueSetup queueSetup;
-    /**
      * Strategy for setting JMSProperties
      */
     private final JMSPropertiesSetter jmsPropertiesStrategy ;
@@ -164,6 +161,25 @@
 	private String connectionFactory;
 
     /**
+     * The pool to use for the jms routing.
+     */
+    private JmsConnectionPool pool ;
+    /**
+     * The jms target destination for routing.
+     */
+    private Destination jmsDestination ;
+    /**
+     * Thread local used for passing JmsSession between methods.
+     * This is to allow modifications without changing the API.
+     */
+    private ThreadLocal<JmsSession> SESSION = new ThreadLocal<JmsSession>() ;
+    /**
+     * The JMS reply to destination.
+     */
+    private String jmsReplyToName ;
+
+    
+    /**
      * Sole public constructor.
      * 
      * @param propertiesTree Action properties.
@@ -192,10 +208,10 @@
         if ( ttlStr != null )
 	        timeToLive = Long.parseLong( ttlStr );
 
-        jndiContextFactory = properties.getAttribute( JMSEpr.JNDI_CONTEXT_FACTORY_TAG );
-        jndiUrl = properties.getAttribute( JMSEpr.JNDI_URL_TAG );
-        jndiPkgPrefix = properties.getAttribute( JMSEpr.JNDI_PKG_PREFIX_TAG );
-        connectionFactory = properties.getAttribute( JMSEpr.CONNECTION_FACTORY_TAG );
+        jndiContextFactory = properties.getAttribute( JMSEpr.JNDI_CONTEXT_FACTORY_TAG, Configuration.getJndiServerContextFactory());
+        jndiUrl = properties.getAttribute( JMSEpr.JNDI_URL_TAG, Configuration.getJndiServerURL());
+        jndiPkgPrefix = properties.getAttribute( JMSEpr.JNDI_PKG_PREFIX_TAG, Configuration.getJndiServerPkgPrefix());
+        connectionFactory = properties.getAttribute( JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
         
         final String propertyStrategy = properties.getAttribute(PROPERTY_STRATEGY) ;
         if (propertyStrategy == null) {
@@ -215,8 +231,17 @@
             throw new ConfigurationException("'" + SECURITY_PRINCIPAL + "' must be accompanied by a '" + SECURITY_CREDITIAL + "'");
         else if ( securityCredential != null && securityPrincipal == null ) 
             throw new ConfigurationException("'" + SECURITY_CREDITIAL + "' must be accompanied by a '" + SECURITY_PRINCIPAL + "'");
-
-        createQueueSetup( queueName, jndiContextFactory, jndiUrl, jndiPkgPrefix, connectionFactory, securityPrincipal, securityCredential );
+        
+        final Properties environment = getEnvironment() ;
+        try {
+            pool = ( securityPrincipal != null )  ? 
+                    JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", securityPrincipal, securityCredential) :
+                    JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory" );
+        } catch (final ConnectionException ce) {
+            throw new ConfigurationException("Unexpected error obtaining JMS connection pool") ;
+        }
+        
+        createQueueSetup(queueName, jndiContextFactory, jndiUrl, jndiPkgPrefix, connectionFactory, securityPrincipal, securityCredential);
     }
 
     /**
@@ -235,41 +260,46 @@
      * @see org.jboss.soa.esb.actions.routing.AbstractRouter#route(java.lang.Object)
      */
     public void route(Object message) throws ActionProcessingException {
-
-    	if(!(message instanceof org.jboss.soa.esb.message.Message)) {
-            throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
+        final JmsSession jmsSession ;
+        try {
+            jmsSession = pool.getSession() ;
+        } catch (final ConnectionException ce) {
+            throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
+        } catch (NamingException ne) {
+            throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
+        } catch (JMSException jmse) {
+            throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
         }
-
-		final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
-
+        SESSION.set(jmsSession) ;
         try {
-        	Message jmsMessage = null;
-
-        	if ( unwrap ) {
-				Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
-				jmsMessage = createJMSMessageWithObjectType( objectFromBody );
+            if(!(message instanceof org.jboss.soa.esb.message.Message)) {
+                throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
             }
-        	else  {
-            	jmsMessage = createObjectMessage(Util.serialize(esbMessage));
-        	}
+        
+            final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
 
-            setStringProperties(jmsMessage);
-            setJMSProperties( esbMessage, jmsMessage );
-            setJMSReplyTo( jmsMessage, esbMessage );
-            send( jmsMessage );
-
-        } catch(Exception e) {
-        	StringBuilder sb = new StringBuilder();
-        	sb.append("Exception while sending message [").append(message).append("] to destination [");
-
-            if (queueSetup != null)
-            	sb.append(queueSetup.destinationName).append("].");
-            else
-            	sb.append("null ].");
-
-            String errorMessage = sb.toString();
-            logger.error(errorMessage, e);
-            throw new ActionProcessingException(errorMessage, e);
+            try {
+                Message jmsMessage = null;
+                
+                if ( unwrap ) {
+                    Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
+                    jmsMessage = createJMSMessageWithObjectType( objectFromBody );
+                } 
+                else  {
+                    jmsMessage = createObjectMessage(Util.serialize(esbMessage));
+                }
+                
+                setStringProperties(jmsMessage);
+                setJMSProperties( esbMessage, jmsMessage );
+                send( jmsMessage );
+            } catch(Exception e) {
+                final String errorMessage = "Exception while sending message [" + message + "] to destination [" + queueName + "]." ;
+                logger.error(errorMessage);
+                throw new ActionProcessingException(errorMessage, e);
+            }
+        } finally {
+            SESSION.set(null) ;
+            pool.closeSession(jmsSession) ;
         }
     }
 
@@ -277,18 +307,18 @@
 	{
 		Message jmsMessage = null;
 		if(objectFromBody instanceof String) {
-        	jmsMessage = queueSetup.jmsSession.createTextMessage();
+        	jmsMessage = SESSION.get().createTextMessage();
 
             if(logger.isDebugEnabled()) {
-                logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueSetup.destinationName + "].");
+                logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueName + "].");
             }
 
             ((TextMessage)jmsMessage).setText((String)objectFromBody);
         } else if(objectFromBody instanceof byte[]) {
-        	jmsMessage = queueSetup.jmsSession.createBytesMessage();
+        	jmsMessage = SESSION.get().createBytesMessage();
 
             if(logger.isDebugEnabled()) {
-                logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueSetup.destinationName + "].");
+                logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueName + "].");
             }
 
             ((BytesMessage)jmsMessage).writeBytes((byte[])objectFromBody);
@@ -301,7 +331,22 @@
 
 	protected void send( Message jmsMessage ) throws JMSException
 	{
-		queueSetup.jmsProducer.send(jmsMessage);
+		final MessageProducer jmsProducer = SESSION.get().createProducer(jmsDestination) ;
+		try {
+			jmsProducer.setPriority(priority) ;
+			jmsProducer.setDeliveryMode(deliveryMode) ;
+			jmsProducer.setTimeToLive(timeToLive) ;
+			
+			// The following seems to be broken but is copied for now.  I am not even sure it is ever used.
+			if (jmsReplyToName != null) {
+				final Destination jmsReplyToDestination = SESSION.get().createQueue(jmsReplyToName);
+				jmsMessage.setJMSReplyTo(jmsReplyToDestination);
+			}
+			
+			jmsProducer.send(jmsMessage);
+		} finally {
+			jmsProducer.close() ;
+		}
 	}
 
 	/**
@@ -318,10 +363,10 @@
 
 	protected Message createObjectMessage(Object message) throws JMSException {
 		Message jmsMessage;
-		jmsMessage = queueSetup.jmsSession.createObjectMessage();
+		jmsMessage = SESSION.get().createObjectMessage();
 
 		if(logger.isDebugEnabled()) {
-		    logger.debug("Sending Object message: [" + message + "] to destination [" + queueSetup.destinationName + "].");
+		    logger.debug("Sending Object message: [" + message + "] to destination [" + queueName + "].");
 		}
 		((ObjectMessage)jmsMessage).setObject((Serializable) message);
 		return jmsMessage;
@@ -353,17 +398,6 @@
         return null;
     }
 
-    @Override
-    protected void finalize() throws Throwable {
-        queueSetup.close();
-        super.finalize();
-    }
-
-    protected void createQueueSetup( String queueName ) throws ConfigurationException
-    {
-    	createQueueSetup( queueName, null, null, null, null, null, null );
-    }
-
     protected void createQueueSetup( String queueName,
     		String jndiContextFactory,
     		String jndiUrl,
@@ -372,124 +406,65 @@
     		String securityPrincipal,
     		String securityCredential) throws ConfigurationException
 	{
-		try {
-			queueSetup = new JMSSendQueueSetup(queueName, jndiContextFactory, jndiUrl, jndiPkgPrefix, connectionFactory, securityPrincipal, securityCredential );
-			queueSetup.setDeliveryMode( deliveryMode );
-			queueSetup.setPriority( priority );
-			queueSetup.setTimeToLive( timeToLive );
-			if ( logger.isDebugEnabled() )
-			{
-				logger.debug( "JMSRouter DeliveryMode : " + deliveryMode);
-				logger.debug( "JMSRouter Priority : " + priority);
-				logger.debug( "JMSRouter TimeToLive : " + timeToLive);
-			}
-		} catch (Throwable t) {
-			throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
-		}
-	}
-
-    private static class JMSSendQueueSetup {
-        JmsSession jmsSession;
-        Destination jmsDestination;
-        MessageProducer jmsProducer;
-        String destinationName;
-        JmsConnectionPool pool;
-
-        private JMSSendQueueSetup(String destinationName) throws NamingException, JMSException, ConnectionException, NamingContextException  {
-        	this( destinationName, null, null, null, null, null, null );
-        }
-
-        private JMSSendQueueSetup(String destinationName,
-        		String jndiContextFactory,
-        		String jndiUrl,
-        		String jndiPkgPrefix,
-        		String connectionFactoryName,
-        		String securityPrincipal,
-        		String securityCredential) throws NamingException, JMSException, ConnectionException, NamingContextException  {
-
-        	if ( jndiContextFactory == null )
-                    jndiContextFactory = Configuration.getJndiServerContextFactory();
-            if ( jndiUrl == null )
-                    jndiUrl = Configuration.getJndiServerURL();
-            if ( jndiPkgPrefix == null )
-                    jndiPkgPrefix = Configuration.getJndiServerPkgPrefix();
-            if ( connectionFactoryName == null )
-            	connectionFactoryName = "ConnectionFactory";
-
-            Properties environment = new Properties();
-            environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiContextFactory);
-            environment.setProperty(Context.PROVIDER_URL, jndiUrl);
-            environment.setProperty(Context.URL_PKG_PREFIXES, jndiPkgPrefix);
-            Context oCtx = NamingContextPool.getNamingContext(environment);
+        final Properties environment = getEnvironment() ;
+		try 
+		{
+            final JmsSession jmsSession = pool.getSession();
             try {
-                pool = ( securityPrincipal != null )  ? 
-                	JmsConnectionPoolContainer.getPool(environment, connectionFactoryName, securityPrincipal, securityCredential) :
-            		JmsConnectionPoolContainer.getPool(environment, connectionFactoryName );
-                
-                this.destinationName = destinationName;
-                
-                jmsSession = pool.getSession();
-                boolean clean = true ;
+                Context oCtx = NamingContextPool.getNamingContext(environment);
                 try {
                     try {
-                    	jmsDestination = (Destination) oCtx.lookup(destinationName);
+                        jmsDestination = (Queue) oCtx.lookup(queueName);
                     } catch (NamingException ne) {
                         try {
                             oCtx = NamingContextPool.replaceNamingContext(oCtx, environment);
-                            jmsDestination = (Queue) oCtx.lookup(destinationName);
+                            jmsDestination = (Queue) oCtx.lookup(queueName);
                         } catch (NamingException nex) {
                             //ActiveMQ
-                            jmsDestination = jmsSession.createQueue(destinationName);
+                            jmsDestination = jmsSession.createQueue(queueName);
                         }
                     }
-                    jmsProducer = jmsSession.createProducer(jmsDestination);
-                    clean = false ;
+                    final MessageProducer jmsProducer = jmsSession.createProducer(jmsDestination);
+                    jmsProducer.close() ;
                 } finally {
-                    if (clean) {
-                        pool.closeSession(jmsSession) ;
-                    }
+                    NamingContextPool.releaseNamingContext(oCtx) ;
                 }
             } finally {
-                NamingContextPool.releaseNamingContext(oCtx) ;
+                pool.closeSession(jmsSession) ;
             }
-        }
-
-        public void setDeliveryMode(final int deliveryMode ) throws JMSException
-        {
-        	if ( jmsProducer != null )
-        		jmsProducer.setDeliveryMode( deliveryMode );
-        }
-
-        public void setPriority(final int priority ) throws JMSException
-        {
-        	if ( jmsProducer != null )
-        		jmsProducer.setPriority( priority );
-        }
-
-        public void setTimeToLive(final long ttl ) throws JMSException
-        {
-        	if ( jmsProducer != null )
-        		jmsProducer.setTimeToLive( ttl );
-        }
-
-        private void close() {
-            try {
-	            pool.closeSession(jmsSession);
-                if (jmsProducer!=null) {
-                    jmsProducer.close();
-                }
-            } catch (Exception e) {
-                logger.error("Unable to close JMS Queue Setup.", e);
-            } 
-        }
+		} 
+		catch (Throwable t) 
+		{
+			throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
+		}
     }
+    
+    private Properties getEnvironment()
+    {
+        final Properties environment = new Properties();
+        environment.setProperty(Context.PROVIDER_URL, jndiUrl);
+        environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiContextFactory);
+        environment.setProperty(Context.URL_PKG_PREFIXES, jndiPkgPrefix);
+        return environment ;
+    }
+    
+    protected void createQueueSetup( String queueName ) throws ConfigurationException
+	{
+    	createQueueSetup(queueName, null, null, null, null, null, null);
+	}
+    
 
 	protected void setJMSReplyTo( final Message jmsMessage, final org.jboss.soa.esb.message.Message esbMessage ) throws URISyntaxException, JMSException, NamingException, ConnectionException, NamingContextException
 	{
-		Destination destination = (Destination) esbMessage.getProperties().getProperty( JMSPropertiesSetter.JMS_REPLY_TO );
-		if ( destination != null )
+		EPR replyToEpr = esbMessage.getHeader().getCall().getReplyTo();
+		if( !( replyToEpr instanceof JMSEpr) )
+			return;
+		
+		JMSEpr jmsEpr = (JMSEpr) replyToEpr;
+		String destinationType = jmsEpr.getDestinationType();
+		if ( destinationType.equals( JMSEpr.QUEUE_TYPE ))
 		{
-			jmsMessage.setJMSReplyTo( (Destination) destination );
+            jmsReplyToName = jmsEpr.getDestinationName() ;
 		}
 	}
 

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java	2008-11-20 06:17:58 UTC (rev 23980)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java	2008-11-20 11:20:41 UTC (rev 23981)
@@ -166,9 +166,6 @@
     		// outbound is to simulate a new jms message that is about to leave the ESB.
     		TextMessageImpl outBoundJmsMessage = new TextMessageImpl();
     		router.setJMSReplyTo( outBoundJmsMessage,  msg );
-
-    		Destination replyTo = outBoundJmsMessage.getJMSReplyTo();
-    		assertTrue( replyTo instanceof Queue );
 		}
 		finally
 		{




More information about the jboss-svn-commits mailing list