[jboss-svn-commits] JBL Code SVN: r23983 - in labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta: tests/src/org/jboss/soa/esb/actions/routing and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 20 07:35:21 EST 2008


Author: kevin.conner at jboss.com
Date: 2008-11-20 07:35:21 -0500 (Thu, 20 Nov 2008)
New Revision: 23983

Added:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java
Modified:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
Log:
Handle reconnects within JMSRouter: JBESB-2191

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-11-20 11:41:57 UTC (rev 23982)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-11-20 12:35:21 UTC (rev 23983)
@@ -260,16 +260,32 @@
      * @see org.jboss.soa.esb.actions.routing.AbstractRouter#route(java.lang.Object)
      */
     public void route(Object message) throws ActionProcessingException {
-        final JmsSession jmsSession ;
+        final JmsSession jmsSession = getJmsSession() ;
         try {
-            jmsSession = pool.getSession() ;
-        } catch (final ConnectionException ce) {
-            throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
-        } catch (NamingException ne) {
-            throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
-        } catch (JMSException jmse) {
-            throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
+            handleRouting(jmsSession, message) ;
+        } catch (final JMSException jmse) {
+            try {
+                if (jmsSession.getTransacted()) {
+                    jmsSession.rollback() ;
+                    throw new ActionProcessingException("Unexpected exception routing message", jmse) ;
+                } else {
+                    // Try to acquire again
+                    final JmsSession newJmsSession = getJmsSession() ;
+                    try {
+                        handleRouting(newJmsSession, message) ;
+                    } finally {
+                        pool.closeSession(newJmsSession) ;
+                    }
+                }
+            } catch (final JMSException jmse2) {
+                throw new ActionProcessingException("Unexpected exception routing message", jmse) ;
+            }
+        } finally {
+            pool.closeSession(jmsSession) ;
         }
+    }
+    
+    private void handleRouting(final JmsSession jmsSession, Object message) throws JMSException, ActionProcessingException {
         SESSION.set(jmsSession) ;
         try {
             if(!(message instanceof org.jboss.soa.esb.message.Message)) {
@@ -293,6 +309,8 @@
                 setJMSProperties( esbMessage, jmsMessage );
                 setJMSReplyTo( jmsMessage, esbMessage );
                 send( jmsMessage );
+            } catch (JMSException jmse) {
+                throw jmse ;
             } catch(Exception e) {
                 final String errorMessage = "Exception while sending message [" + message + "] to destination [" + queueName + "]." ;
                 logger.error(errorMessage);
@@ -300,10 +318,21 @@
             }
         } finally {
             SESSION.set(null) ;
-            pool.closeSession(jmsSession) ;
         }
     }
 
+    private JmsSession getJmsSession() throws ActionProcessingException {
+        try {
+            return pool.getSession() ;
+        } catch (final ConnectionException ce) {
+            throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
+        } catch (NamingException ne) {
+            throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
+        } catch (JMSException jmse) {
+            throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
+        }
+    }
+    
     protected Message createJMSMessageWithObjectType( Object objectFromBody ) throws JMSException
 	{
 		Message jmsMessage = null;

Copied: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java (from rev 23969, labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java)
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java	2008-11-20 12:35:21 UTC (rev 23983)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.actions.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.NamingContextPool;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockejb.jms.MockQueue;
+import org.mockejb.jms.QueueConnectionFactoryImpl;
+import org.mockejb.jndi.MockContextFactory;
+
+/**
+ * JMSRouter unit tests.
+ * 
+ * @author <a href="mailto:kevin.conner at jboss.com">Kevin Conner</a>
+ */
+public class JMSRouterUnitTest
+{
+    private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
+    private static final String QUEUE_NAME = "failQueue" ;
+    
+    @Before
+    public void setUp()
+        throws Exception
+    {
+        MockContextFactory.setAsInitial();
+        
+        final Context ctx = NamingContextPool.getNamingContext(null);
+        try
+        {
+            ctx.rebind(CONNECTION_FACTORY, new MockQueueConnectionFactory());
+            ctx.rebind(QUEUE_NAME, new MockQueue(QUEUE_NAME));
+        }
+        finally
+        {
+            NamingContextPool.releaseNamingContext(ctx) ;
+        }
+        System.setProperty(Environment.JNDI_SERVER_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY)) ;
+    }
+    
+    @After
+    public void tearDown()
+        throws Exception
+    {
+        MockContextFactory.revertSetAsInitial();
+    }
+    
+    @Test
+    public void testRetry()
+        throws Exception
+    {
+        final ConfigTree config = new ConfigTree("config") ;
+        config.setAttribute("jndiName", QUEUE_NAME);
+        
+        final JMSRouter router = new JMSRouter(config) ;
+        
+        assertEquals("Original connection count", 1, MockQueueConnectionFactory.queueConnectionCount) ;
+        assertEquals("Original session count", 1, MockQueueExceptionHandlerInvocationHandler.queueSessionCount) ;
+        assertEquals("Original producer count", 1, MockQueueSessionInvocationHandler.producerCount) ;
+        
+        try
+        {
+            router.process(MessageFactory.getInstance().getMessage()) ;
+            fail("Expected to receive an ActionProcessingException") ;
+        }
+        catch (final ActionProcessingException ape) {} // expected
+        
+        // Connection count changes as we fire the exception handler and this closes connections/sessions/producers
+        assertEquals("Original connection count", 2, MockQueueConnectionFactory.queueConnectionCount) ;
+        // session count should increment by one to reflect the second session created when handling the exception.
+        assertEquals("Original session count", 2, MockQueueExceptionHandlerInvocationHandler.queueSessionCount) ;
+        // One producer for setup, first failure and second retry.
+        assertEquals("Original producer count", 3, MockQueueSessionInvocationHandler.producerCount) ;
+    }
+    
+    private static final class MockQueueConnectionFactory extends QueueConnectionFactoryImpl
+    {
+        static int queueConnectionCount ;
+        
+        @Override
+        public QueueConnection createQueueConnection() throws JMSException
+        {
+            queueConnectionCount++ ;
+            return (QueueConnection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueConnection.class},
+                    new MockQueueExceptionHandlerInvocationHandler(super.createQueueConnection())) ;
+        }
+    }
+    
+    private static final class MockQueueExceptionHandlerInvocationHandler implements InvocationHandler
+    {
+        private final QueueConnection queueConnection ;
+        private ExceptionListener exceptionListener ;
+        static int queueSessionCount ;
+            
+        MockQueueExceptionHandlerInvocationHandler(final QueueConnection queueConnection)
+        {
+            this.queueConnection = queueConnection ;
+        }
+            
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("setExceptionListener".equals(methodName))
+            {
+                exceptionListener = (ExceptionListener)args[0] ;
+                return null ;
+            }
+            else if ("getExceptionListener".equals(methodName))
+            {
+                return exceptionListener ;
+            }
+            else
+            {
+                final Object response = method.invoke(queueConnection, args) ;
+                if (response instanceof QueueSession)
+                {
+                    queueSessionCount++ ;
+                    final QueueSession queueSession = (QueueSession)response ;
+                    return (QueueSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueSession.class},
+                            new MockQueueSessionInvocationHandler(this, queueSession)) ;
+                }
+                else
+                {
+                    return response ;
+                }
+            }
+        }
+        
+        void fireExceptionListener(final JMSException exception)
+        {
+            if (exceptionListener != null)
+            {
+                exceptionListener.onException(exception) ;
+            }
+        }
+    }
+    
+    private static final class MockQueueSessionInvocationHandler implements InvocationHandler
+    {
+        private final MockQueueExceptionHandlerInvocationHandler exceptionHandler ;
+        private final QueueSession queueSession ;
+        static int producerCount ;
+            
+        MockQueueSessionInvocationHandler(final MockQueueExceptionHandlerInvocationHandler exceptionHandler, final QueueSession queueSession)
+        {
+            this.exceptionHandler = exceptionHandler ;
+            this.queueSession = queueSession ;
+        }
+            
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("recover".equals(methodName))
+            {
+                return null ;
+            }
+            else if ("createProducer".equals(methodName))
+            {
+                producerCount++ ;
+                return new MockFailMessageProducer(exceptionHandler) ;
+            }
+            else if ("getAcknowledgeMode".equals(methodName))
+            {
+                return Integer.valueOf(Session.AUTO_ACKNOWLEDGE) ;
+            }
+            else if ("getTransacted".equals(methodName))
+            {
+                return Boolean.FALSE ;
+            }
+            else
+            {
+                return method.invoke(queueSession, args) ;
+            }
+        }
+    }
+    
+    private static final class MockFailMessageProducer implements MessageProducer
+    {
+        private final MockQueueExceptionHandlerInvocationHandler exceptionHandler ;
+        
+        public MockFailMessageProducer(final MockQueueExceptionHandlerInvocationHandler exceptionHandler)
+        {
+            this.exceptionHandler = exceptionHandler ;
+        }
+        
+        public void close() throws JMSException {}
+
+        public int getDeliveryMode()
+            throws JMSException
+        {
+            return 0;
+        }
+
+        public Destination getDestination()
+            throws JMSException
+        {
+            return null;
+        }
+
+        public boolean getDisableMessageID()
+            throws JMSException
+        {
+            return false;
+        }
+
+        public boolean getDisableMessageTimestamp()
+            throws JMSException
+        {
+            return false;
+        }
+
+        public int getPriority()
+            throws JMSException
+        {
+            return 0;
+        }
+
+        public long getTimeToLive()
+            throws JMSException
+        {
+            return 0;
+        }
+
+        public void send(Message arg0)
+            throws JMSException
+        {
+            exception() ;
+        }
+
+        public void send(Destination arg0, Message arg1)
+            throws JMSException
+        {
+            throw new JMSException("Deliberate send exception") ;
+        }
+
+        public void send(Message arg0, int arg1, int arg2, long arg3)
+            throws JMSException
+        {
+            throw new JMSException("Deliberate send exception") ;
+        }
+
+        private void exception()
+            throws JMSException
+        {
+            final JMSException exception = new JMSException("Deliberate send exception") ;
+            exceptionHandler.fireExceptionListener(exception) ;
+            throw exception ;
+        }
+        
+        public void send(Destination arg0, Message arg1, int arg2, int arg3, long arg4)
+            throws JMSException
+        {
+        }
+
+        public void setDeliveryMode(int arg0) throws JMSException {}
+
+        public void setDisableMessageID(boolean arg0) throws JMSException {}
+
+        public void setDisableMessageTimestamp(boolean arg0) throws JMSException {}
+
+        public void setPriority(int arg0) throws JMSException {}
+
+        public void setTimeToLive(long arg0) throws JMSException {}
+    }
+    
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(JMSRouterUnitTest.class);
+    }
+}




More information about the jboss-svn-commits mailing list