[jboss-svn-commits] JBL Code SVN: r27573 - in labs/jbossesb/trunk/product/rosetta: src/org/jboss/internal/soa/esb/rosetta/pooling and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun Jul 5 07:18:05 EDT 2009
Author: kevin.conner at jboss.com
Date: 2009-07-05 07:18:05 -0400 (Sun, 05 Jul 2009)
New Revision: 27573
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
Log:
Support redelivery attempt for specific failures: JBESB-2680
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2009-07-05 11:12:11 UTC (rev 27572)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2009-07-05 11:18:05 UTC (rev 27573)
@@ -41,6 +41,7 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.helpers.JmsComposer;
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionFailureException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
@@ -176,6 +177,26 @@
* if problems were encountered
*/
public boolean deliver(org.jboss.soa.esb.message.Message message) throws CourierException {
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ // fall through
+ } catch (final IllegalStateException ise) {
+ // fall through
+ }
+
+ cleanup() ;
+
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+ } catch (final IllegalStateException ise) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+ }
+ }
+
+ private boolean internalDeliver(org.jboss.soa.esb.message.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
ObjectMessage msg;
if (null == message) {
@@ -206,11 +227,15 @@
// Set the JMS message from the ESB message...
try {
setJMSProperties(message, msg);
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ } catch (final IllegalStateException ise) {
+ throw ise ;
} catch (JMSException e) {
throw new CourierMarshalUnmarshalException("Failed to set JMS Message properties from ESB Message properties.", e);
}
- return deliver(msg);
+ return internalDeliver(msg);
}
/**
@@ -222,6 +247,26 @@
* if problems were encountered
*/
public boolean deliver(javax.jms.Message message) throws CourierException {
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ // fall through
+ } catch (final IllegalStateException ise) {
+ // fall through
+ }
+
+ cleanup() ;
+
+ try {
+ return internalDeliver(message) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+ } catch (final IllegalStateException ise) {
+ throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+ }
+ }
+
+ private boolean internalDeliver(javax.jms.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
if (_isReceiver) {
throw new CourierException("This is a read-only Courier");
}
@@ -257,6 +302,12 @@
return true;
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException e) {
if (!jmsConnectRetry(e))
throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
@@ -325,7 +376,7 @@
return true;
} // ________________________________
- private void createMessageProducer() throws CourierException, NamingContextException {
+ private void createMessageProducer() throws CourierException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
synchronized(this) {
if (_messageProducer == null) {
try {
@@ -367,6 +418,12 @@
}
}
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException ex) {
_logger.debug("Error from JMS system.", ex);
@@ -402,6 +459,26 @@
}
public javax.jms.Message pickupPayload(long millis) throws CourierException, CourierTimeoutException {
+ try {
+ return internalPickupPayload(millis) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ // fall through
+ } catch (final IllegalStateException ise) {
+ // fall through
+ }
+
+ cleanup() ;
+
+ try {
+ return internalPickupPayload(millis) ;
+ } catch (final JmsConnectionFailureException jcfe) {
+ throw new CourierTransportException("Caught exception during receive and could not reconnect! ", jcfe);
+ } catch (final IllegalStateException ise) {
+ throw new CourierTransportException("Caught exception during receive and could not reconnect! ", ise);
+ }
+ }
+
+ private javax.jms.Message internalPickupPayload(long millis) throws CourierException, CourierTimeoutException, JmsConnectionFailureException, IllegalStateException {
if (!_isReceiver)
throw new CourierException("This is an outgoing-only Courier");
if (millis < 1)
@@ -426,6 +503,12 @@
try {
jmsMessage = _messageConsumer.receive(millis);
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException e) {
if (!jmsConnectRetry(e))
throw new CourierTransportException("Caught exception during receive and could not reconnect! ",e);
@@ -486,7 +569,7 @@
esbPropertiesStrategy.setPropertiesFromJMSMessage(fromJMS, toESB);
}
- private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException {
+ private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
Context oJndiCtx = null;
synchronized(this) {
@@ -532,6 +615,12 @@
NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
}
+ catch (final JmsConnectionFailureException jcfe) {
+ throw jcfe ;
+ }
+ catch (final IllegalStateException ise) {
+ throw ise ;
+ }
catch (JMSException ex) {
_logger.debug("Error from JMS system.", ex);
Copied: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java (from rev 26902, labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java)
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java 2009-07-05 11:18:05 UTC (rev 27573)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.jms.JMSException;
+
+/**
+ * This exception is thrown from a JMS session/producer/consumer method when the
+ * underlying exception appears to have been closed. This will occur when the connection
+ * has not been closed through the standard JMS exception listener route.
+ *
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class JmsConnectionFailureException extends JMSException
+{
+ /**
+ * The serial version UID of this exception
+ */
+ private static final long serialVersionUID = 4352490530798177236L;
+
+ /**
+ * Construct an jms connection failure exception with the specified message.
+ * @param message The exception message.
+ */
+ public JmsConnectionFailureException(final String message)
+ {
+ super(message) ;
+ }
+
+ /**
+ * Construct an jms connection failure exception with the specified message and associated cause.
+ * @param message The exception message.
+ * @param cause The associated cause.
+ */
+ public JmsConnectionFailureException(String message, Throwable cause)
+ {
+ this(message) ;
+ initCause(cause) ;
+ }
+}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-07-05 11:12:11 UTC (rev 27572)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-07-05 11:18:05 UTC (rev 27573)
@@ -394,6 +394,26 @@
}
/**
+ * Handle exceptions that occur in the session or its child objects.
+ * @param jmsSession The jmsSession associated with the call.
+ * @param jmse The JMSException to check.
+ * @throws JMSException if the exception is to be overridden.
+ */
+ void handleException(final JmsSession jmsSession, final JMSException jmse)
+ throws JMSException
+ {
+ if (messagingConnectionFailure(jmse))
+ {
+ final JmsSessionPool sessionPool = findOwnerPool(jmsSession);
+
+ if(sessionPool != null) {
+ sessionPool.cleanSessionPool() ;
+ }
+ throw new JmsConnectionFailureException("The underlying exception appears to have failed", jmse) ;
+ }
+ }
+
+ /**
* Check the exception to see whether it indicates a connection failure in JBM
* @param jmse The current JMS Exception
* @return true if it suggests a failure, false otherwise.
@@ -453,22 +473,6 @@
}
/**
- * This method is called when the pool needs to cleaned. It closes all open sessions
- * and the connection.
- */
- private void cleanSessionPool()
- {
- for(JmsSessionPool sessionPool : sessionPools) {
- try {
- sessionPool.cleanSessionPool();
- } catch(Exception e) {
- logger.error("Exception while cleaning JmsSessionPool.", e);
- }
- }
- sessionPools.clear();
- }
-
- /**
* This method is called when the pool needs to destroyed. It closes all open sessions
* and the connection and removes it from the container's poolMap.
*/
@@ -799,6 +803,8 @@
{
// Sessions need to be created in this way because of an issue with JBM.
// See https://jira.jboss.org/jira/browse/JBESB-1799
+ final long currentID = id ;
+ final Connection currentConnection = jmsConnection ;
final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
public JmsSession call()
throws JMSException
@@ -806,9 +812,9 @@
final JmsSession session ;
if (transacted) {
- session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+ session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)currentConnection).createXASession(), currentID);
} else {
- session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+ session = new JmsSession(JmsConnectionPool.this, currentConnection.createSession(transacted, acknowledgeMode), currentID);
}
return session ;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2009-07-05 11:12:11 UTC (rev 27572)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2009-07-05 11:18:05 UTC (rev 27573)
@@ -22,6 +22,10 @@
package org.jboss.internal.soa.esb.rosetta.pooling;
import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.HashSet;
import javax.jms.BytesMessage;
@@ -49,6 +53,10 @@
public class JmsSession implements Session
{
/**
+ * The connection pool.
+ */
+ private final JmsConnectionPool pool ;
+ /**
* The session delegate.
*/
private final Session session ;
@@ -80,16 +88,18 @@
/**
* Create the session wrapper.
+ * @param pool The pool associated with this session.
* @param session The session delegate.
* @param id The pool instance id.
* @param isJTA True if this tales part in a JTA transaction
* @throws JMSException
*/
- JmsSession(final Session session, final long id)
+ JmsSession(final JmsConnectionPool pool, final Session session, final long id)
throws JMSException
{
+ this.pool = pool ;
this.id = id ;
- this.session = session ;
+ this.session = (Session)getExceptionHandler(pool, Session.class, session) ;
acknowledgeMode = session.getAcknowledgeMode() ;
// Workaround for JBESB-1873
if ("org.jboss.jms.client.JBossSession".equals(session.getClass().getName()))
@@ -290,7 +300,7 @@
}
final QueueBrowser result = getQueueBrowser(queueBrowser) ;
queueBrowserSet.add(result) ;
- return result ;
+ return (QueueBrowser)getExceptionHandler(pool, QueueBrowser.class, result) ;
}
private synchronized MessageConsumer trackMessageConsumer(MessageConsumer messageConsumer)
@@ -303,7 +313,7 @@
}
final MessageConsumer result = getMessageConsumer(messageConsumer) ;
messageConsumerSet.add(result) ;
- return result ;
+ return (MessageConsumer)getExceptionHandler(pool, MessageConsumer.class, result) ;
}
private synchronized TopicSubscriber trackTopicSubscriber(TopicSubscriber topicSubscriber)
@@ -316,7 +326,7 @@
}
final TopicSubscriber result = getTopicSubscriber(topicSubscriber) ;
messageConsumerSet.add(result) ;
- return result ;
+ return (TopicSubscriber)getExceptionHandler(pool, TopicSubscriber.class, result) ;
}
private synchronized MessageProducer trackMessageProducer(MessageProducer messageProducer)
@@ -329,7 +339,7 @@
}
final MessageProducer result = getMessageProducer(messageProducer) ;
messageProducerSet.add(result) ;
- return result ;
+ return (MessageProducer)getExceptionHandler(pool, MessageProducer.class, result) ;
}
synchronized void releaseResources()
@@ -410,4 +420,62 @@
{
return suspect ;
}
+
+ /**
+ * Wrap the object in an exception handler.
+ * @param pool The pool associated with this session.
+ * @param instanceClass The interface type of the instance.
+ * @param instance The instance
+ * @return
+ */
+ protected Object getExceptionHandler(final JmsConnectionPool pool,
+ final Class<?> instanceClass, final Object instance)
+ {
+ final InvocationHandler handler = new ExceptionHandler(pool, instance) ;
+ return Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {instanceClass}, handler);
+ }
+
+ /**
+ * Handler responsible for intercepting JMS exceptions and checking for unreported closure.
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+ private final class ExceptionHandler implements InvocationHandler
+ {
+ /**
+ * The connection pool.
+ */
+ private final JmsConnectionPool pool ;
+ /**
+ * The target instance.
+ */
+ private final Object target ;
+
+ /**
+ * Construct the handler using the specified target.
+ * @param pool The associated connection pool.
+ * @param target The target instance.
+ */
+ public ExceptionHandler(final JmsConnectionPool pool, final Object target)
+ {
+ this.pool = pool ;
+ this.target = target ;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ try
+ {
+ return method.invoke(target, args);
+ }
+ catch (final InvocationTargetException ite)
+ {
+ final Throwable th = ite.getCause() ;
+ if (th instanceof JMSException)
+ {
+ pool.handleException(JmsSession.this, (JMSException)th) ;
+ }
+ throw th ;
+ }
+ }
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2009-07-05 11:12:11 UTC (rev 27572)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2009-07-05 11:18:05 UTC (rev 27573)
@@ -79,9 +79,9 @@
JmsXASession(final JmsConnectionPool pool, final XASession session, final long id)
throws JMSException
{
- super(session, id) ;
+ super(pool, session, id) ;
this.pool = pool ;
- this.session = session ;
+ this.session = (XASession)getExceptionHandler(pool, XASession.class, session) ;
}
@Override
Copied: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java (from rev 26902, labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java)
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java 2009-07-05 11:18:05 UTC (rev 27573)
@@ -0,0 +1,664 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.Context;
+import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.common.TransactionStrategy.NullTransactionStrategy;
+import org.jboss.soa.esb.helpers.NamingContextPool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockejb.jms.MockQueue;
+import org.mockejb.jms.MockTopic;
+import org.mockejb.jndi.MockContextFactory;
+
+/**
+ * Unit tests for handling JBM specific error conditions in JmsConnectionPool
+ *
+ * @author <a href='kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class JmsConnectionPoolUnitTest
+{
+ private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
+ private static final String QUEUE_NAME = "testQueue" ;
+ private static final String TOPIC_NAME = "testTopic" ;
+
+ @Before
+ public void setUp()
+ throws Exception
+ {
+ MockContextFactory.setAsInitial();
+
+ final Context ctx = NamingContextPool.getNamingContext(null);
+ try
+ {
+ ctx.rebind(CONNECTION_FACTORY, new MockXAConnectionFactory());
+ }
+ finally
+ {
+ NamingContextPool.releaseNamingContext(ctx) ;
+ }
+ System.setProperty(Environment.JNDI_SERVER_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY)) ;
+ }
+
+ @After
+ public void tearDown()
+ throws Exception
+ {
+ MockContextFactory.revertSetAsInitial();
+ }
+
+ @Test
+ public void testSessionRepeatableAcquire()
+ throws Exception
+ {
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+ Assert.assertSame("Same session returned", session, session2) ;
+ }
+
+ @Test
+ public void testSessionRetry()
+ throws Exception
+ {
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ MockConnectionInvocationHandler.throwFault = true ;
+ final JmsSession session ;
+ try
+ {
+ session = pool.getSession() ;
+ Assert.assertFalse("fault should have been thrown", MockConnectionInvocationHandler.throwFault) ;
+ }
+ finally
+ {
+ MockConnectionInvocationHandler.throwFault = false ;
+ }
+
+ Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+ pool.closeSession(session) ;
+ }
+
+ @Test
+ public void testSessionQueueBrowserRetry()
+ throws Exception
+ {
+ final Queue queue = new MockQueue(QUEUE_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+
+ final QueueBrowser queueBrowser = session.createBrowser(queue) ;
+ queueBrowser.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createBrowser(queue) ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+
+ @Test
+ public void testSessionMessageConsumerRetry()
+ throws Exception
+ {
+ final Queue queue = new MockQueue(QUEUE_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+
+ final MessageConsumer messageConsumer = session.createConsumer(queue) ;
+ messageConsumer.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createConsumer(queue) ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+
+ @Test
+ public void testSessionTopicSubscriberRetry()
+ throws Exception
+ {
+ final Topic topic = new MockTopic(TOPIC_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+
+ final TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+ topicSubscriber.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+
+ @Test
+ public void testSessionMessageProducerRetry()
+ throws Exception
+ {
+ final Queue queue = new MockQueue(QUEUE_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+
+ final MessageProducer messageProducer = session.createProducer(queue) ;
+ messageProducer.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createProducer(queue) ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+
+ @Test
+ public void testXASessionRetry()
+ throws Exception
+ {
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+ try
+ {
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ MockConnectionInvocationHandler.throwFault = true ;
+ final JmsSession session ;
+ try
+ {
+ session = pool.getSession() ;
+ Assert.assertFalse("fault should have been thrown", MockConnectionInvocationHandler.throwFault) ;
+ }
+ finally
+ {
+ MockConnectionInvocationHandler.throwFault = false ;
+ }
+
+ Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+ pool.closeSession(session) ;
+ }
+ finally
+ {
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+ }
+
+ @Test
+ public void testXASessionQueueBrowserRetry()
+ throws Exception
+ {
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+ try
+ {
+ final Queue queue = new MockQueue(QUEUE_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+
+ final QueueBrowser queueBrowser = session.createBrowser(queue) ;
+ queueBrowser.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createBrowser(queue) ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+ finally
+ {
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+ }
+
+ @Test
+ public void testXASessionMessageConsumerRetry()
+ throws Exception
+ {
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+ try
+ {
+ final Queue queue = new MockQueue(QUEUE_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+
+ final MessageConsumer messageConsumer = session.createConsumer(queue) ;
+ messageConsumer.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createConsumer(queue) ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+ finally
+ {
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+ }
+
+ @Test
+ public void testXASessionTopicSubscriberRetry()
+ throws Exception
+ {
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+ try
+ {
+ final Topic topic = new MockTopic(TOPIC_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+
+ final TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+ topicSubscriber.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+ finally
+ {
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+ }
+
+ @Test
+ public void testXASessionMessageProducerRetry()
+ throws Exception
+ {
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+ try
+ {
+ final Queue queue = new MockQueue(QUEUE_NAME) ;
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+ final JmsSession session = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+
+ final MessageProducer messageProducer = session.createProducer(queue) ;
+ messageProducer.close() ;
+
+ MockSessionInvocationHandler.throwFault = true ;
+ try
+ {
+ session.createProducer(queue) ;
+ Assert.fail("Expected JmsConnectionFailureException") ;
+ }
+ catch (final JmsConnectionFailureException jmse) {} // expected
+ finally
+ {
+ MockSessionInvocationHandler.throwFault = false ;
+ }
+
+ pool.closeSession(session) ;
+
+ final JmsSession session2 = pool.getSession() ;
+ Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+ Assert.assertNotSame("Session class", session, session2) ;
+ }
+ finally
+ {
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+ }
+
+ private Map<String, String> getPoolEnv()
+ {
+ final Map<String, String> env = new HashMap<String, String>() ;
+ env.put(JMSEpr.CONNECTION_FACTORY_TAG, CONNECTION_FACTORY);
+ return env ;
+ }
+
+ static class MockXAConnectionFactory implements XAConnectionFactory, ConnectionFactory
+ {
+ public XAConnection createXAConnection()
+ throws JMSException
+ {
+ return (XAConnection)Proxy.newProxyInstance(MockXAConnectionFactory.class.getClassLoader(), new Class[] {XAConnection.class},
+ new MockConnectionInvocationHandler()) ;
+ }
+
+ public XAConnection createXAConnection(final String user, final String password)
+ throws JMSException
+ {
+ return createXAConnection() ;
+ }
+
+ public Connection createConnection()
+ throws JMSException
+ {
+ return (Connection)Proxy.newProxyInstance(MockXAConnectionFactory.class.getClassLoader(), new Class[] {Connection.class},
+ new MockConnectionInvocationHandler()) ;
+ }
+
+ public Connection createConnection(final String user, final String password)
+ throws JMSException
+ {
+ return createConnection() ;
+ }
+ }
+
+ static final class MockConnectionInvocationHandler implements InvocationHandler
+ {
+ private ExceptionListener exceptionListener ;
+ static boolean throwFault ;
+
+ MockConnectionInvocationHandler()
+ {
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("setExceptionListener".equals(methodName))
+ {
+ exceptionListener = (ExceptionListener)args[0] ;
+ return null ;
+ }
+ else if ("getExceptionListener".equals(methodName))
+ {
+ return exceptionListener ;
+ }
+ else if ("createSession".equals(methodName))
+ {
+ checkFault() ;
+ final Integer acknowledgeMode = (Integer)args[1] ;
+ return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {Session.class},
+ new MockSessionInvocationHandler(acknowledgeMode)) ;
+ }
+ else if ("createXASession".equals(methodName))
+ {
+ checkFault() ;
+ return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {XASession.class},
+ new MockSessionInvocationHandler(Session.SESSION_TRANSACTED)) ;
+ }
+ else
+ {
+ System.out.println("Connection method " + method.getName() + " called") ;
+ return null ;
+ }
+ }
+
+ void fireExceptionListener(final JMSException exception)
+ {
+ if (exceptionListener != null)
+ {
+ exceptionListener.onException(exception) ;
+ }
+ }
+
+ private void checkFault()
+ throws JMSException
+ {
+ if (throwFault)
+ {
+ final JMSException exception = new JMSException("Test exception") ;
+ exception.initCause(new IllegalStateException("JMS IllegalStateException")) ;
+ // clear it down to allow retry
+ throwFault = false ;
+ throw exception ;
+ }
+ }
+ }
+
+ static final class MockSessionInvocationHandler implements InvocationHandler
+ {
+ private final Integer acknowledgeMode ;
+ static boolean throwFault ;
+
+ MockSessionInvocationHandler(final Integer acknowledgeMode)
+ {
+ this.acknowledgeMode = acknowledgeMode ;
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("getAcknowledgeMode".equals(methodName))
+ {
+ return acknowledgeMode ;
+ }
+ else if ("createBrowser".equals(methodName))
+ {
+ checkFault() ;
+ return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {QueueBrowser.class},
+ new MockNullInvocationHandler()) ;
+ }
+ else if ("createConsumer".equals(methodName))
+ {
+ checkFault() ;
+ return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageConsumer.class},
+ new MockNullInvocationHandler()) ;
+ }
+ else if ("createDurableSubscriber".equals(methodName))
+ {
+ checkFault() ;
+ return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {TopicSubscriber.class},
+ new MockNullInvocationHandler()) ;
+ }
+ else if ("createProducer".equals(methodName))
+ {
+ checkFault() ;
+ return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageProducer.class},
+ new MockNullInvocationHandler()) ;
+ }
+ else
+ {
+ System.out.println("Session method " + method.getName() + " called") ;
+ return null ;
+ }
+ }
+
+ private void checkFault()
+ throws JMSException
+ {
+ if (throwFault)
+ {
+ final JMSException exception = new JMSException("Test exception") ;
+ exception.initCause(new IllegalStateException("JMS IllegalStateException")) ;
+ throw exception ;
+ }
+ }
+ }
+
+ static final class MockNullInvocationHandler implements InvocationHandler
+ {
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("hashCode".equals(methodName))
+ {
+ return System.identityHashCode(proxy) ;
+ }
+ else if ("equals".equals(methodName))
+ {
+ return proxy == args[0] ;
+ }
+ else
+ {
+ return null ;
+ }
+ }
+ }
+
+ private static final class MockActiveTransactionStrategy extends NullTransactionStrategy
+ {
+ @Override
+ public boolean isActive()
+ throws TransactionStrategyException
+ {
+ return true ;
+ }
+
+ @Override
+ public void registerSynchronization(final Synchronization sync)
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public void enlistResource(final XAResource resource)
+ throws TransactionStrategyException
+ {
+ }
+
+ @Override
+ public Object getTransaction()
+ throws TransactionStrategyException
+ {
+ return this ;
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(JmsConnectionPoolUnitTest.class);
+ }
+}
More information about the jboss-svn-commits
mailing list