[jboss-svn-commits] JBL Code SVN: r23983 - in labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta: tests/src/org/jboss/soa/esb/actions/routing and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 20 07:35:21 EST 2008
Author: kevin.conner at jboss.com
Date: 2008-11-20 07:35:21 -0500 (Thu, 20 Nov 2008)
New Revision: 23983
Added:
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java
Modified:
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
Log:
Handle reconnects within JMSRouter: JBESB-2191
Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-11-20 11:41:57 UTC (rev 23982)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-11-20 12:35:21 UTC (rev 23983)
@@ -260,16 +260,32 @@
* @see org.jboss.soa.esb.actions.routing.AbstractRouter#route(java.lang.Object)
*/
public void route(Object message) throws ActionProcessingException {
- final JmsSession jmsSession ;
+ final JmsSession jmsSession = getJmsSession() ;
try {
- jmsSession = pool.getSession() ;
- } catch (final ConnectionException ce) {
- throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
- } catch (NamingException ne) {
- throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
- } catch (JMSException jmse) {
- throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
+ handleRouting(jmsSession, message) ;
+ } catch (final JMSException jmse) {
+ try {
+ if (jmsSession.getTransacted()) {
+ jmsSession.rollback() ;
+ throw new ActionProcessingException("Unexpected exception routing message", jmse) ;
+ } else {
+ // Try to acquire again
+ final JmsSession newJmsSession = getJmsSession() ;
+ try {
+ handleRouting(newJmsSession, message) ;
+ } finally {
+ pool.closeSession(newJmsSession) ;
+ }
+ }
+ } catch (final JMSException jmse2) {
+ throw new ActionProcessingException("Unexpected exception routing message", jmse) ;
+ }
+ } finally {
+ pool.closeSession(jmsSession) ;
}
+ }
+
+ private void handleRouting(final JmsSession jmsSession, Object message) throws JMSException, ActionProcessingException {
SESSION.set(jmsSession) ;
try {
if(!(message instanceof org.jboss.soa.esb.message.Message)) {
@@ -293,6 +309,8 @@
setJMSProperties( esbMessage, jmsMessage );
setJMSReplyTo( jmsMessage, esbMessage );
send( jmsMessage );
+ } catch (JMSException jmse) {
+ throw jmse ;
} catch(Exception e) {
final String errorMessage = "Exception while sending message [" + message + "] to destination [" + queueName + "]." ;
logger.error(errorMessage);
@@ -300,10 +318,21 @@
}
} finally {
SESSION.set(null) ;
- pool.closeSession(jmsSession) ;
}
}
+ private JmsSession getJmsSession() throws ActionProcessingException {
+ try {
+ return pool.getSession() ;
+ } catch (final ConnectionException ce) {
+ throw new ActionProcessingException("Unexpected ConnectionException acquiring JMS session", ce) ;
+ } catch (NamingException ne) {
+ throw new ActionProcessingException("Unexpected NamingException acquiring JMS session", ne) ;
+ } catch (JMSException jmse) {
+ throw new ActionProcessingException("Unexpected JMSException acquiring JMS session", jmse) ;
+ }
+ }
+
protected Message createJMSMessageWithObjectType( Object objectFromBody ) throws JMSException
{
Message jmsMessage = null;
Copied: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java (from rev 23969, labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java)
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JMSRouterUnitTest.java 2008-11-20 12:35:21 UTC (rev 23983)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.actions.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.NamingContextPool;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockejb.jms.MockQueue;
+import org.mockejb.jms.QueueConnectionFactoryImpl;
+import org.mockejb.jndi.MockContextFactory;
+
+/**
+ * JMSRouter unit tests.
+ *
+ * @author <a href="mailto:kevin.conner at jboss.com">Kevin Conner</a>
+ */
+public class JMSRouterUnitTest
+{
+ private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
+ private static final String QUEUE_NAME = "failQueue" ;
+
+ @Before
+ public void setUp()
+ throws Exception
+ {
+ MockContextFactory.setAsInitial();
+
+ final Context ctx = NamingContextPool.getNamingContext(null);
+ try
+ {
+ ctx.rebind(CONNECTION_FACTORY, new MockQueueConnectionFactory());
+ ctx.rebind(QUEUE_NAME, new MockQueue(QUEUE_NAME));
+ }
+ finally
+ {
+ NamingContextPool.releaseNamingContext(ctx) ;
+ }
+ System.setProperty(Environment.JNDI_SERVER_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY)) ;
+ }
+
+ @After
+ public void tearDown()
+ throws Exception
+ {
+ MockContextFactory.revertSetAsInitial();
+ }
+
+ @Test
+ public void testRetry()
+ throws Exception
+ {
+ final ConfigTree config = new ConfigTree("config") ;
+ config.setAttribute("jndiName", QUEUE_NAME);
+
+ final JMSRouter router = new JMSRouter(config) ;
+
+ assertEquals("Original connection count", 1, MockQueueConnectionFactory.queueConnectionCount) ;
+ assertEquals("Original session count", 1, MockQueueExceptionHandlerInvocationHandler.queueSessionCount) ;
+ assertEquals("Original producer count", 1, MockQueueSessionInvocationHandler.producerCount) ;
+
+ try
+ {
+ router.process(MessageFactory.getInstance().getMessage()) ;
+ fail("Expected to receive an ActionProcessingException") ;
+ }
+ catch (final ActionProcessingException ape) {} // expected
+
+ // Connection count changes as we fire the exception handler and this closes connections/sessions/producers
+ assertEquals("Original connection count", 2, MockQueueConnectionFactory.queueConnectionCount) ;
+ // session count should increment by one to reflect the second session created when handling the exception.
+ assertEquals("Original session count", 2, MockQueueExceptionHandlerInvocationHandler.queueSessionCount) ;
+ // One producer for setup, first failure and second retry.
+ assertEquals("Original producer count", 3, MockQueueSessionInvocationHandler.producerCount) ;
+ }
+
+ private static final class MockQueueConnectionFactory extends QueueConnectionFactoryImpl
+ {
+ static int queueConnectionCount ;
+
+ @Override
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ queueConnectionCount++ ;
+ return (QueueConnection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueConnection.class},
+ new MockQueueExceptionHandlerInvocationHandler(super.createQueueConnection())) ;
+ }
+ }
+
+ private static final class MockQueueExceptionHandlerInvocationHandler implements InvocationHandler
+ {
+ private final QueueConnection queueConnection ;
+ private ExceptionListener exceptionListener ;
+ static int queueSessionCount ;
+
+ MockQueueExceptionHandlerInvocationHandler(final QueueConnection queueConnection)
+ {
+ this.queueConnection = queueConnection ;
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("setExceptionListener".equals(methodName))
+ {
+ exceptionListener = (ExceptionListener)args[0] ;
+ return null ;
+ }
+ else if ("getExceptionListener".equals(methodName))
+ {
+ return exceptionListener ;
+ }
+ else
+ {
+ final Object response = method.invoke(queueConnection, args) ;
+ if (response instanceof QueueSession)
+ {
+ queueSessionCount++ ;
+ final QueueSession queueSession = (QueueSession)response ;
+ return (QueueSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueSession.class},
+ new MockQueueSessionInvocationHandler(this, queueSession)) ;
+ }
+ else
+ {
+ return response ;
+ }
+ }
+ }
+
+ void fireExceptionListener(final JMSException exception)
+ {
+ if (exceptionListener != null)
+ {
+ exceptionListener.onException(exception) ;
+ }
+ }
+ }
+
+ private static final class MockQueueSessionInvocationHandler implements InvocationHandler
+ {
+ private final MockQueueExceptionHandlerInvocationHandler exceptionHandler ;
+ private final QueueSession queueSession ;
+ static int producerCount ;
+
+ MockQueueSessionInvocationHandler(final MockQueueExceptionHandlerInvocationHandler exceptionHandler, final QueueSession queueSession)
+ {
+ this.exceptionHandler = exceptionHandler ;
+ this.queueSession = queueSession ;
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("recover".equals(methodName))
+ {
+ return null ;
+ }
+ else if ("createProducer".equals(methodName))
+ {
+ producerCount++ ;
+ return new MockFailMessageProducer(exceptionHandler) ;
+ }
+ else if ("getAcknowledgeMode".equals(methodName))
+ {
+ return Integer.valueOf(Session.AUTO_ACKNOWLEDGE) ;
+ }
+ else if ("getTransacted".equals(methodName))
+ {
+ return Boolean.FALSE ;
+ }
+ else
+ {
+ return method.invoke(queueSession, args) ;
+ }
+ }
+ }
+
+ private static final class MockFailMessageProducer implements MessageProducer
+ {
+ private final MockQueueExceptionHandlerInvocationHandler exceptionHandler ;
+
+ public MockFailMessageProducer(final MockQueueExceptionHandlerInvocationHandler exceptionHandler)
+ {
+ this.exceptionHandler = exceptionHandler ;
+ }
+
+ public void close() throws JMSException {}
+
+ public int getDeliveryMode()
+ throws JMSException
+ {
+ return 0;
+ }
+
+ public Destination getDestination()
+ throws JMSException
+ {
+ return null;
+ }
+
+ public boolean getDisableMessageID()
+ throws JMSException
+ {
+ return false;
+ }
+
+ public boolean getDisableMessageTimestamp()
+ throws JMSException
+ {
+ return false;
+ }
+
+ public int getPriority()
+ throws JMSException
+ {
+ return 0;
+ }
+
+ public long getTimeToLive()
+ throws JMSException
+ {
+ return 0;
+ }
+
+ public void send(Message arg0)
+ throws JMSException
+ {
+ exception() ;
+ }
+
+ public void send(Destination arg0, Message arg1)
+ throws JMSException
+ {
+ throw new JMSException("Deliberate send exception") ;
+ }
+
+ public void send(Message arg0, int arg1, int arg2, long arg3)
+ throws JMSException
+ {
+ throw new JMSException("Deliberate send exception") ;
+ }
+
+ private void exception()
+ throws JMSException
+ {
+ final JMSException exception = new JMSException("Deliberate send exception") ;
+ exceptionHandler.fireExceptionListener(exception) ;
+ throw exception ;
+ }
+
+ public void send(Destination arg0, Message arg1, int arg2, int arg3, long arg4)
+ throws JMSException
+ {
+ }
+
+ public void setDeliveryMode(int arg0) throws JMSException {}
+
+ public void setDisableMessageID(boolean arg0) throws JMSException {}
+
+ public void setDisableMessageTimestamp(boolean arg0) throws JMSException {}
+
+ public void setPriority(int arg0) throws JMSException {}
+
+ public void setTimeToLive(long arg0) throws JMSException {}
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(JMSRouterUnitTest.class);
+ }
+}
More information about the jboss-svn-commits
mailing list