[jboss-svn-commits] JBL Code SVN: r23921 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta: tests/src/org/jboss/soa/esb/actions/routing and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Nov 18 08:40:16 EST 2008


Author: kevin.conner at jboss.com
Date: 2008-11-18 08:40:15 -0500 (Tue, 18 Nov 2008)
New Revision: 23921

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
Log:
Handle session/providers: JBESB-2190

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-11-18 13:13:41 UTC (rev 23920)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-11-18 13:40:15 UTC (rev 23921)
@@ -50,7 +50,6 @@
 import org.jboss.soa.esb.common.Configuration;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.helpers.KeyValuePair;
-import org.jboss.soa.esb.helpers.NamingContextException;
 import org.jboss.soa.esb.helpers.NamingContextPool;
 import org.jboss.soa.esb.notification.jms.DefaultJMSPropertiesSetter;
 import org.jboss.soa.esb.notification.jms.JMSPropertiesSetter;
@@ -123,10 +122,6 @@
      */
     private String queueName;
     /**
-     * JMS Queue setup.
-     */
-    private JMSSendQueueSetup queueSetup;
-    /**
      * Strategy for setting JMSProperties
      */
     private JMSPropertiesSetter jmsPropertiesStrategy = new DefaultJMSPropertiesSetter();
@@ -144,6 +139,24 @@
      * The time-to-live for messages sent with this router
      */
     private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+    /**
+     * The pool to use for the jms routing.
+     */
+    private JmsConnectionPool pool ;
+    /**
+     * The jms target destination for routing.
+     */
+    private Destination jmsDestination ;
+    /**
+     * Thread local used for passing JmsSession between methods.
+     * This is to allow modifications without changing the API.
+     */
+    private ThreadLocal<JmsSession> SESSION = new ThreadLocal<JmsSession>() ;
+    /**
+     * The JMS reply to destination.
+     */
+    private String jmsReplyToName ;
+
     
     /**
      * Public constructor.
@@ -181,7 +194,16 @@
         else if ( securityCredential != null && securityPrincipal == null ) 
             throw new ConfigurationException("'" + SECURITY_CREDITIAL + "' must be accompanied by a '" + SECURITY_PRINCIPAL + "'");
         
-    	createQueueSetup(queueName, securityPrincipal, securityCredential);
+        final Properties environment = getEnvironment() ;
+        try {
+            pool = ( securityPrincipal != null )  ? 
+                    JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", securityPrincipal, securityCredential) :
+                    JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory" );
+        } catch (final ConnectionException ce) {
+            throw new ConfigurationException("Unexpected error obtaining JMS connection pool") ;
+        }
+        
+        createQueueSetup(queueName, securityPrincipal, securityCredential);
     }
     
     /**
@@ -200,40 +222,46 @@
      * @see org.jboss.soa.esb.actions.routing.AbstractRouter#route(java.lang.Object)
      */
     public void route(Object message) throws ActionProcessingException {
-    	
-    	if(!(message instanceof org.jboss.soa.esb.message.Message)) {
-            throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
+        final JmsSession jmsSession ;
+        try {
+            jmsSession = pool.getSession() ;
+        } catch (final ConnectionException ce) {
+            throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
+        } catch (NamingException ne) {
+            throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
+        } catch (JMSException jmse) {
+            throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
         }
-        
-		final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
-        
+        SESSION.set(jmsSession) ;
         try {
-        	Message jmsMessage = null;
-        	
-        	if ( unwrap ) {
-				Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
-				jmsMessage = createJMSMessageWithObjectType( objectFromBody );
-            } 
-        	else  {
-            	jmsMessage = createObjectMessage(Util.serialize(esbMessage));
-        	}
-        	
-            setStringProperties(jmsMessage);
-            setJMSProperties( esbMessage, jmsMessage );
-            send( jmsMessage );
+            if(!(message instanceof org.jboss.soa.esb.message.Message)) {
+                throw new ActionProcessingException("Cannot send Object [" + message.getClass().getName() + "] to destination [" + queueName + "]. Object must be an instance of org.jboss.soa.esb.message.Message) .");
+            }
+        
+            final org.jboss.soa.esb.message.Message esbMessage = (org.jboss.soa.esb.message.Message)message;
             
-        } catch(Exception e) {
-        	StringBuilder sb = new StringBuilder();
-        	sb.append("Exception while sending message [").append(message).append("] to destination [");
-            
-            if (queueSetup != null)
-            	sb.append(queueSetup.queueName).append("].");
-            else
-            	sb.append("null ].");
-            
-            String errorMessage = sb.toString();
-            logger.error(errorMessage, e);
-            throw new ActionProcessingException(errorMessage, e);
+            try {
+                Message jmsMessage = null;
+                
+                if ( unwrap ) {
+                    Object objectFromBody = getPayloadProxy().getPayload(esbMessage);
+                    jmsMessage = createJMSMessageWithObjectType( objectFromBody );
+                } 
+                else  {
+                    jmsMessage = createObjectMessage(Util.serialize(esbMessage));
+                }
+                
+                setStringProperties(jmsMessage);
+                setJMSProperties( esbMessage, jmsMessage );
+                send( jmsMessage );
+            } catch(Exception e) {
+                final String errorMessage = "Exception while sending message [" + message + "] to destination [" + queueName + "]." ;
+                logger.error(errorMessage);
+                throw new ActionProcessingException(errorMessage, e);
+            }
+        } finally {
+            SESSION.set(null) ;
+            pool.closeSession(jmsSession) ;
         }
     }
     
@@ -241,18 +269,18 @@
 	{
 		Message jmsMessage = null;
 		if(objectFromBody instanceof String) {
-        	jmsMessage = queueSetup.jmsSession.createTextMessage();
+        	jmsMessage = SESSION.get().createTextMessage();
 
             if(logger.isDebugEnabled()) {
-                logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueSetup.queueName + "].");
+                logger.debug("Sending Text message: [" + objectFromBody + "] to destination [" + queueName + "].");
             }
             
             ((TextMessage)jmsMessage).setText((String)objectFromBody);
         } else if(objectFromBody instanceof byte[]) {
-        	jmsMessage = queueSetup.jmsSession.createBytesMessage();
+        	jmsMessage = SESSION.get().createBytesMessage();
         
             if(logger.isDebugEnabled()) {
-                logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueSetup.queueName + "].");
+                logger.debug("Sending byte[] message: [" + objectFromBody + "] to destination [" + queueName + "].");
             }
             
             ((BytesMessage)jmsMessage).writeBytes((byte[])objectFromBody);
@@ -265,7 +293,22 @@
 
 	protected void send( Message jmsMessage ) throws JMSException
 	{
-		queueSetup.jmsProducer.send(jmsMessage);
+		final MessageProducer jmsProducer = SESSION.get().createProducer(jmsDestination) ;
+		try {
+			jmsProducer.setPriority(priority) ;
+			jmsProducer.setDeliveryMode(deliveryMode) ;
+			jmsProducer.setTimeToLive(timeToLive) ;
+			
+			// The following seems to be broken but is copied for now.  I am not even sure it is ever used.
+			if (jmsReplyToName != null) {
+				final Destination jmsReplyToDestination = SESSION.get().createQueue(jmsReplyToName);
+				jmsMessage.setJMSReplyTo(jmsReplyToDestination);
+			}
+			
+			jmsProducer.send(jmsMessage);
+		} finally {
+			jmsProducer.close() ;
+		}
 	}
 	
 	/**
@@ -282,10 +325,10 @@
 
 	protected Message createObjectMessage(Object message) throws JMSException {
 		Message jmsMessage;
-		jmsMessage = queueSetup.jmsSession.createObjectMessage();
+		jmsMessage = SESSION.get().createObjectMessage();
 		
 		if(logger.isDebugEnabled()) {
-		    logger.debug("Sending Object message: [" + message + "] to destination [" + queueSetup.queueName + "].");
+		    logger.debug("Sending Object message: [" + message + "] to destination [" + queueName + "].");
 		}
 		((ObjectMessage)jmsMessage).setObject((Serializable) message);
 		return jmsMessage;
@@ -316,117 +359,56 @@
     public Serializable getErrorNotification(org.jboss.soa.esb.message.Message message) {
         return null;
     }
-
-    @Override
-    protected void finalize() throws Throwable {
-        queueSetup.close();
-        super.finalize();
-    }
     
     void createQueueSetup( final String queueName, final String principal, final String credential ) throws ConfigurationException
     {
+        final Properties environment = getEnvironment() ;
 		try 
 		{
-			queueSetup = new JMSSendQueueSetup(queueName,principal,credential);
-			queueSetup.setDeliveryMode( deliveryMode );
-			queueSetup.setPriority( priority );
-			queueSetup.setTimeToLive( timeToLive );
-			if ( logger.isDebugEnabled() )
-			{
-				logger.debug( "JMSRouter DeliveryMode : " + deliveryMode);
-				logger.debug( "JMSRouter Priority : " + priority);
-				logger.debug( "JMSRouter TimeToLive : " + timeToLive);
-			}
-		} 
-		catch (Throwable t) 
-		{
-			throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
-		}
-    }
-    
-    protected void createQueueSetup( String queueName ) throws ConfigurationException
-	{
-    	createQueueSetup(queueName, null, null );
-	}
-    
-    private static class JMSSendQueueSetup {
-        JmsSession jmsSession;
-        Queue jmsQueue;
-        MessageProducer jmsProducer;
-        String queueName;
-        JmsConnectionPool pool;
-        Properties environment;
-        
-        // TODO: Modify to support topic destinations too
-
-        private JMSSendQueueSetup(final String queueName, final String principal, final String credential) throws NamingException, JMSException, ConnectionException, NamingContextException  {
-            environment = new Properties();
-            environment.setProperty(Context.PROVIDER_URL, Configuration.getJndiServerURL());
-            environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, Configuration.getJndiServerContextFactory());
-            environment.setProperty(Context.URL_PKG_PREFIXES, Configuration.getJndiServerPkgPrefix());
-            Context oCtx = NamingContextPool.getNamingContext(environment);
+            final JmsSession jmsSession = pool.getSession();
             try {
-                pool = ( principal != null )  ? 
-                	JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", principal, credential) :
-            		JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory" );
-                
-                this.queueName = queueName;
-                
-                jmsSession = pool.getSession();
-                boolean clean = true ;
+                Context oCtx = NamingContextPool.getNamingContext(environment);
                 try {
                     try {
-                    	jmsQueue = (Queue) oCtx.lookup(queueName);
+                        jmsDestination = (Queue) oCtx.lookup(queueName);
                     } catch (NamingException ne) {
                         try {
                             oCtx = NamingContextPool.replaceNamingContext(oCtx, environment);
-                            jmsQueue = (Queue) oCtx.lookup(queueName);
+                            jmsDestination = (Queue) oCtx.lookup(queueName);
                         } catch (NamingException nex) {
                             //ActiveMQ
-                            jmsQueue = jmsSession.createQueue(queueName);
+                            jmsDestination = jmsSession.createQueue(queueName);
                         }
                     }
-                    jmsProducer = jmsSession.createProducer(jmsQueue);
-                    clean = false ;
+                    final MessageProducer jmsProducer = jmsSession.createProducer(jmsDestination);
+                    jmsProducer.close() ;
                 } finally {
-                    if (clean) {
-                        pool.closeSession(jmsSession) ;
-                    }
+                    NamingContextPool.releaseNamingContext(oCtx) ;
                 }
             } finally {
-                NamingContextPool.releaseNamingContext(oCtx) ;
+                pool.closeSession(jmsSession) ;
             }
-        }
-        
-        public void setDeliveryMode(final int deliveryMode ) throws JMSException
-        {
-        	if ( jmsProducer != null )
-        		jmsProducer.setDeliveryMode( deliveryMode );
-        }
-        
-        public void setPriority(final int priority ) throws JMSException
-        {
-        	if ( jmsProducer != null )
-        		jmsProducer.setPriority( priority );
-        }
-        
-        public void setTimeToLive(final long ttl ) throws JMSException
-        {
-        	if ( jmsProducer != null )
-        		jmsProducer.setTimeToLive( ttl );
-        }
-        
-        private void close() {
-            try {
-	            pool.closeSession(jmsSession);
-                if (jmsProducer!=null) {
-                    jmsProducer.close();                    
-                }
-            } catch (Exception e) {
-                logger.error("Unable to close JMS Queue Setup.", e);
-            } 
-        }
+		} 
+		catch (Throwable t) 
+		{
+			throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
+		}
     }
+    
+    private Properties getEnvironment()
+    {
+        final Properties environment = new Properties();
+        environment.setProperty(Context.PROVIDER_URL, Configuration.getJndiServerURL());
+        environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, Configuration.getJndiServerContextFactory());
+        environment.setProperty(Context.URL_PKG_PREFIXES, Configuration.getJndiServerPkgPrefix());
+        return environment ;
+    }
+    
+    protected void createQueueSetup( String queueName ) throws ConfigurationException
+	{
+    	createQueueSetup(queueName, null, null );
+	}
+    
 
 	protected void setJMSReplyTo( Message jmsMessage, org.jboss.soa.esb.message.Message esbMessage ) throws URISyntaxException, JMSException, NamingException, ConnectionException
 	{
@@ -436,11 +418,9 @@
 		
 		JMSEpr jmsEpr = (JMSEpr) replyToEpr;
 		String destinationType = jmsEpr.getDestinationType();
-        Destination jmsDestination = null;
-        
 		if ( destinationType.equals( JMSEpr.QUEUE_TYPE ))
 		{
-            jmsDestination = queueSetup.jmsSession.createQueue( jmsEpr.getDestinationName() );
+            jmsReplyToName = jmsEpr.getDestinationName() ;
 		}
 		else
 		{
@@ -449,10 +429,6 @@
             jmsDestination = pool.getTopicSession().createTopic( jmsEpr.getDestinationName() );
             */
 		}
-		
-		if ( jmsDestination != null )
-			jmsMessage.setJMSReplyTo( jmsDestination );
-		
 	}
 
 	/**

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java	2008-11-18 13:13:41 UTC (rev 23920)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java	2008-11-18 13:40:15 UTC (rev 23921)
@@ -145,11 +145,6 @@
 		msg.getHeader().getCall().setReplyTo( jmsEpr );
 		JMSRouter router = new JMSRouter( createConfigTree() );
 		router.setJMSReplyTo( jmsMessage,  msg );
-		
-		Destination replyTo = jmsMessage.getJMSReplyTo();
-		assertTrue( replyTo instanceof Queue );
-		Queue replyToQueue = (Queue) replyTo;
-		assertEquals( queueName , replyToQueue.getQueueName() );
 	}
 	
 	@Test
@@ -164,11 +159,6 @@
 		msg.getHeader().getCall().setReplyTo( jmsEpr );
 		JMSRouter router = new JMSRouter( createConfigTree() );
 		router.setJMSReplyTo( jmsMessage,  msg );
-		
-		Destination replyTo = jmsMessage.getJMSReplyTo();
-		assertTrue( replyTo instanceof Topic );
-		Topic replyToTopic = (Topic) replyTo;
-		assertEquals( queueName , replyToTopic.getTopicName() );
 	}
 	
 	@Test




More information about the jboss-svn-commits mailing list