[jboss-svn-commits] JBL Code SVN: r26902 - in labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta: src/org/jboss/internal/soa/esb/rosetta/pooling and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Jun 10 08:13:02 EDT 2009


Author: kevin.conner at jboss.com
Date: 2009-06-10 08:13:02 -0400 (Wed, 10 Jun 2009)
New Revision: 26902

Added:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
Modified:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
Log:
Support redelivery attempt for specific failures: JBESB-2575

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2009-06-10 11:59:35 UTC (rev 26901)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2009-06-10 12:13:02 UTC (rev 26902)
@@ -41,6 +41,7 @@
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.helpers.JmsComposer;
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionFailureException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
@@ -176,6 +177,26 @@
      *                          if problems were encountered
      */
     public boolean deliver(org.jboss.soa.esb.message.Message message) throws CourierException {
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            // fall through
+        } catch (final IllegalStateException ise) {
+            // fall through
+        }
+        
+        cleanup() ;
+        
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+        } catch (final IllegalStateException ise) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+        }
+    }
+    
+    private boolean internalDeliver(org.jboss.soa.esb.message.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
         ObjectMessage msg;
 
         if (null == message) {
@@ -206,11 +227,15 @@
         // Set the JMS message from the ESB message...
         try {
             setJMSProperties(message, msg);
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw jcfe ;
+        } catch (final IllegalStateException ise) {
+            throw ise ;
         } catch (JMSException e) {
             throw new CourierMarshalUnmarshalException("Failed to set JMS Message properties from ESB Message properties.", e);
         }
 
-        return deliver(msg);
+        return internalDeliver(msg);
     }
 
     /**
@@ -222,6 +247,26 @@
      *                          if problems were encountered
      */
     public boolean deliver(javax.jms.Message message) throws CourierException {
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            // fall through
+        } catch (final IllegalStateException ise) {
+            // fall through
+        }
+        
+        cleanup() ;
+        
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+        } catch (final IllegalStateException ise) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+        }
+    }
+    
+    private boolean internalDeliver(javax.jms.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
         if (_isReceiver) {
             throw new CourierException("This is a read-only Courier");
         }
@@ -257,6 +302,12 @@
                     
                     return true;
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException e) {
                     if (!jmsConnectRetry(e))
                         throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
@@ -325,7 +376,7 @@
         return true;
     } // ________________________________
 
-    private void createMessageProducer() throws CourierException, NamingContextException {
+    private void createMessageProducer() throws CourierException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
         synchronized(this) {
             if (_messageProducer == null) {
                 try {
@@ -367,6 +418,12 @@
                         }
                     }
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException ex) {
                     _logger.debug("Error from JMS system.", ex);
 
@@ -402,6 +459,26 @@
     }
 
     public javax.jms.Message pickupPayload(long millis) throws CourierException, CourierTimeoutException {
+        try {
+            return internalPickupPayload(millis) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            // fall through
+        } catch (final IllegalStateException ise) {
+            // fall through
+        }
+        
+        cleanup() ;
+        
+        try {
+            return internalPickupPayload(millis) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw new CourierTransportException("Caught exception during receive and could not reconnect! ", jcfe);
+        } catch (final IllegalStateException ise) {
+            throw new CourierTransportException("Caught exception during receive and could not reconnect! ", ise);
+        }
+    }
+    
+    private javax.jms.Message internalPickupPayload(long millis) throws CourierException, CourierTimeoutException, JmsConnectionFailureException, IllegalStateException {
         if (!_isReceiver)
             throw new CourierException("This is an outgoing-only Courier");
         if (millis < 1)
@@ -426,6 +503,12 @@
                 try {
                     jmsMessage = _messageConsumer.receive(millis);
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException e) {
                     if (!jmsConnectRetry(e))
                         throw new CourierTransportException("Caught exception during receive and could not reconnect! ",e);
@@ -486,7 +569,7 @@
         esbPropertiesStrategy.setPropertiesFromJMSMessage(fromJMS, toESB);
     }
 
-    private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException {
+    private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
         Context oJndiCtx = null;
 
         synchronized(this) {
@@ -532,6 +615,12 @@
                         NamingContextPool.releaseNamingContext(oJndiCtx) ;
                     }
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException ex) {
                     _logger.debug("Error from JMS system.", ex);
 

Added: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java	2009-06-10 12:13:02 UTC (rev 26902)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.jms.JMSException;
+
+/**
+ * This exception is thrown from a JMS session/producer/consumer method when the
+ * underlying exception appears to have been closed.  This will occur when the connection
+ * has not been closed through the standard JMS exception listener route.
+ * 
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class JmsConnectionFailureException extends JMSException
+{
+    /**
+     * The serial version UID of this exception
+     */
+    private static final long serialVersionUID = 4352490530798177236L;
+
+    /**
+     * Construct an jms connection failure exception with the specified message.
+     * @param message The exception message.
+     */
+    public JmsConnectionFailureException(final String message) 
+    {
+        super(message) ;
+    }
+
+    /**
+     * Construct an jms connection failure exception with the specified message and associated cause.
+     * @param message The exception message.
+     * @param cause The associated cause.
+     */
+    public JmsConnectionFailureException(String message, Throwable cause)
+    {
+        this(message) ;
+        initCause(cause) ;
+   }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2009-06-10 11:59:35 UTC (rev 26901)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2009-06-10 12:13:02 UTC (rev 26902)
@@ -394,6 +394,26 @@
     }
     
     /**
+     * Handle exceptions that occur in the session or its child objects.
+     * @param jmsSession The jmsSession associated with the call.
+     * @param jmse The JMSException to check.
+     * @throws JMSException if the exception is to be overridden.
+     */
+    void handleException(final JmsSession jmsSession, final JMSException jmse)
+        throws JMSException
+    {
+        if (messagingConnectionFailure(jmse))
+        {
+            final JmsSessionPool sessionPool = findOwnerPool(jmsSession);
+
+            if(sessionPool != null) {
+                sessionPool.cleanSessionPool() ;
+            }
+            throw new JmsConnectionFailureException("The underlying exception appears to have failed", jmse) ;
+        }
+    }
+    
+    /**
      * Check the exception to see whether it indicates a connection failure in JBM
      * @param jmse The current JMS Exception
      * @return true if it suggests a failure, false otherwise.
@@ -453,22 +473,6 @@
     }
 
     /**
-     * This method is called when the pool needs to cleaned. It closes all open sessions
-     * and the connection.
-     */
-    private void cleanSessionPool()
-    {
-        for(JmsSessionPool sessionPool : sessionPools) {
-            try {
-                sessionPool.cleanSessionPool();
-            } catch(Exception e) {
-                logger.error("Exception while cleaning JmsSessionPool.", e);
-            }
-        }
-        sessionPools.clear();
-    }
-    
-    /**
      * This method is called when the pool needs to destroyed. It closes all open sessions
      * and the connection and removes it from the container's poolMap.
      */
@@ -799,6 +803,8 @@
         {
             // Sessions need to be created in this way because of an issue with JBM.
             // See https://jira.jboss.org/jira/browse/JBESB-1799
+            final long currentID = id ;
+            final Connection currentConnection = jmsConnection ;
             final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
                 public JmsSession call()
                     throws JMSException
@@ -806,9 +812,9 @@
                     final JmsSession session ;
 
                     if (transacted) {
-                        session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+                        session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)currentConnection).createXASession(), currentID);
                     } else {
-                        session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+                        session = new JmsSession(JmsConnectionPool.this, currentConnection.createSession(transacted, acknowledgeMode), currentID);
                     }
 
                     return session ;

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2009-06-10 11:59:35 UTC (rev 26901)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2009-06-10 12:13:02 UTC (rev 26902)
@@ -22,6 +22,10 @@
 package org.jboss.internal.soa.esb.rosetta.pooling;
 
 import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.HashSet;
 
 import javax.jms.BytesMessage;
@@ -49,6 +53,10 @@
 public class JmsSession implements Session
 {
     /**
+     * The connection pool.
+     */
+    private final JmsConnectionPool pool ;
+    /**
      * The session delegate.
      */
     private final Session session ;
@@ -80,16 +88,18 @@
     
     /**
      * Create the session wrapper.
+     * @param pool The pool associated with this session.
      * @param session The session delegate.
      * @param id The pool instance id.
      * @param isJTA True if this tales part in a JTA transaction
      * @throws JMSException
      */
-    JmsSession(final Session session, final long id)
+    JmsSession(final JmsConnectionPool pool, final Session session, final long id)
         throws JMSException
     {
+        this.pool = pool ;
         this.id = id ;
-        this.session = session ;
+        this.session = (Session)getExceptionHandler(pool, Session.class, session) ;
         acknowledgeMode = session.getAcknowledgeMode() ;
         // Workaround for JBESB-1873
         if ("org.jboss.jms.client.JBossSession".equals(session.getClass().getName()))
@@ -290,7 +300,7 @@
         }
         final QueueBrowser result = getQueueBrowser(queueBrowser) ;
         queueBrowserSet.add(result) ;
-        return result ;
+        return (QueueBrowser)getExceptionHandler(pool, QueueBrowser.class, result) ;
     }
 
     private synchronized MessageConsumer trackMessageConsumer(MessageConsumer messageConsumer)
@@ -303,7 +313,7 @@
         }
         final MessageConsumer result = getMessageConsumer(messageConsumer) ;
         messageConsumerSet.add(result) ;
-        return result ;
+        return (MessageConsumer)getExceptionHandler(pool, MessageConsumer.class, result) ;
     }
 
     private synchronized TopicSubscriber trackTopicSubscriber(TopicSubscriber topicSubscriber)
@@ -316,7 +326,7 @@
         }
         final TopicSubscriber result = getTopicSubscriber(topicSubscriber) ;
         messageConsumerSet.add(result) ;
-        return result ;
+        return (TopicSubscriber)getExceptionHandler(pool, TopicSubscriber.class, result) ;
     }
 
     private synchronized MessageProducer trackMessageProducer(MessageProducer messageProducer)
@@ -329,7 +339,7 @@
         }
         final MessageProducer result = getMessageProducer(messageProducer) ;
         messageProducerSet.add(result) ;
-        return result ;
+        return (MessageProducer)getExceptionHandler(pool, MessageProducer.class, result) ;
     }
 
     synchronized void releaseResources()
@@ -410,4 +420,62 @@
     {
         return suspect ;
     }
+
+    /**
+     * Wrap the object in an exception handler.
+     * @param pool The pool associated with this session.
+     * @param instanceClass The interface type of the instance.
+     * @param instance The instance
+     * @return 
+     */
+    protected Object getExceptionHandler(final JmsConnectionPool pool,
+            final Class<?> instanceClass, final Object instance)
+    {
+        final InvocationHandler handler = new ExceptionHandler(pool, instance) ;
+        return Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {instanceClass}, handler);
+    }
+    
+    /**
+     * Handler responsible for intercepting JMS exceptions and checking for unreported closure.
+     * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+     */
+    private final class ExceptionHandler implements InvocationHandler
+    {
+        /**
+         * The connection pool.
+         */
+        private final JmsConnectionPool pool ;
+        /**
+         * The target instance.
+         */
+        private final Object target ;
+        
+        /**
+         * Construct the handler using the specified target.
+         * @param pool The associated connection pool.
+         * @param target The target instance.
+         */
+        public ExceptionHandler(final JmsConnectionPool pool, final Object target)
+        {
+            this.pool = pool ;
+            this.target = target ;
+        }
+        
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+        {
+            try
+            {
+                return method.invoke(target, args);
+            }
+            catch (final InvocationTargetException ite)
+            {
+                final Throwable th = ite.getCause() ;
+                if (th instanceof JMSException)
+                {
+                    pool.handleException(JmsSession.this, (JMSException)th) ;
+                }
+                throw th ;
+            }
+        }
+    }
 }

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2009-06-10 11:59:35 UTC (rev 26901)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2009-06-10 12:13:02 UTC (rev 26902)
@@ -79,9 +79,9 @@
     JmsXASession(final JmsConnectionPool pool, final XASession session, final long id)
         throws JMSException
     {
-        super(session, id) ;
+        super(pool, session, id) ;
         this.pool = pool ;
-        this.session = session ;
+        this.session = (XASession)getExceptionHandler(pool, XASession.class, session) ;
     }
     
     @Override

Added: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java	2009-06-10 12:13:02 UTC (rev 26902)
@@ -0,0 +1,664 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.Context;
+import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.common.TransactionStrategy.NullTransactionStrategy;
+import org.jboss.soa.esb.helpers.NamingContextPool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockejb.jms.MockQueue;
+import org.mockejb.jms.MockTopic;
+import org.mockejb.jndi.MockContextFactory;
+
+/**
+ * Unit tests for handling JBM specific error conditions in JmsConnectionPool
+ * 
+ * @author <a href='kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class JmsConnectionPoolUnitTest
+{
+    private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
+    private static final String QUEUE_NAME = "testQueue" ;
+    private static final String TOPIC_NAME = "testTopic" ;
+    
+    @Before
+    public void setUp()
+        throws Exception
+    {
+        MockContextFactory.setAsInitial();
+        
+        final Context ctx = NamingContextPool.getNamingContext(null);
+        try
+        {
+            ctx.rebind(CONNECTION_FACTORY, new MockXAConnectionFactory());
+        }
+        finally
+        {
+            NamingContextPool.releaseNamingContext(ctx) ;
+        }
+        System.setProperty(Environment.JNDI_SERVER_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY)) ;
+    }
+    
+    @After
+    public void tearDown()
+        throws Exception
+    {
+        MockContextFactory.revertSetAsInitial();
+    }
+    
+    @Test
+    public void testSessionRepeatableAcquire()
+        throws Exception
+    {
+        final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+        final JmsSession session = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+        pool.closeSession(session) ;
+        
+        final JmsSession session2 = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+        Assert.assertSame("Same session returned", session, session2) ;
+    }
+    
+    @Test
+    public void testSessionRetry()
+        throws Exception
+    {
+        final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+        MockConnectionInvocationHandler.throwFault = true ;
+        final JmsSession session ;
+        try
+        {
+            session = pool.getSession() ;
+            Assert.assertFalse("fault should have been thrown", MockConnectionInvocationHandler.throwFault) ;
+        }
+        finally
+        {
+            MockConnectionInvocationHandler.throwFault = false ;
+        }
+        
+        Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+        pool.closeSession(session) ;
+    }
+    
+    @Test
+    public void testSessionQueueBrowserRetry()
+        throws Exception
+    {
+        final Queue queue = new MockQueue(QUEUE_NAME) ;
+        final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+        final JmsSession session = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+        
+        final QueueBrowser queueBrowser = session.createBrowser(queue) ;
+        queueBrowser.close() ;
+        
+        MockSessionInvocationHandler.throwFault = true ;
+        try
+        {
+            session.createBrowser(queue) ;
+            Assert.fail("Expected JmsConnectionFailureException") ;
+        }
+        catch (final JmsConnectionFailureException jmse) {} // expected
+        finally
+        {
+            MockSessionInvocationHandler.throwFault = false ;
+        }
+        
+        pool.closeSession(session) ;
+        
+        final JmsSession session2 = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+        Assert.assertNotSame("Session class", session, session2) ;
+    }
+    
+    @Test
+    public void testSessionMessageConsumerRetry()
+        throws Exception
+    {
+        final Queue queue = new MockQueue(QUEUE_NAME) ;
+        final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+        final JmsSession session = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+        
+        final MessageConsumer messageConsumer = session.createConsumer(queue) ;
+        messageConsumer.close() ;
+        
+        MockSessionInvocationHandler.throwFault = true ;
+        try
+        {
+            session.createConsumer(queue) ;
+            Assert.fail("Expected JmsConnectionFailureException") ;
+        }
+        catch (final JmsConnectionFailureException jmse) {} // expected
+        finally
+        {
+            MockSessionInvocationHandler.throwFault = false ;
+        }
+        
+        pool.closeSession(session) ;
+        
+        final JmsSession session2 = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+        Assert.assertNotSame("Session class", session, session2) ;
+    }
+    
+    @Test
+    public void testSessionTopicSubscriberRetry()
+        throws Exception
+    {
+        final Topic topic = new MockTopic(TOPIC_NAME) ;
+        final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+        final JmsSession session = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+        
+        final TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+        topicSubscriber.close() ;
+        
+        MockSessionInvocationHandler.throwFault = true ;
+        try
+        {
+            session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+            Assert.fail("Expected JmsConnectionFailureException") ;
+        }
+        catch (final JmsConnectionFailureException jmse) {} // expected
+        finally
+        {
+            MockSessionInvocationHandler.throwFault = false ;
+        }
+        
+        pool.closeSession(session) ;
+        
+        final JmsSession session2 = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+        Assert.assertNotSame("Session class", session, session2) ;
+    }
+    
+    @Test
+    public void testSessionMessageProducerRetry()
+        throws Exception
+    {
+        final Queue queue = new MockQueue(QUEUE_NAME) ;
+        final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+        final JmsSession session = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
+        
+        final MessageProducer messageProducer = session.createProducer(queue) ;
+        messageProducer.close() ;
+        
+        MockSessionInvocationHandler.throwFault = true ;
+        try
+        {
+            session.createProducer(queue) ;
+            Assert.fail("Expected JmsConnectionFailureException") ;
+        }
+        catch (final JmsConnectionFailureException jmse) {} // expected
+        finally
+        {
+            MockSessionInvocationHandler.throwFault = false ;
+        }
+        
+        pool.closeSession(session) ;
+        
+        final JmsSession session2 = pool.getSession() ;
+        Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
+        Assert.assertNotSame("Session class", session, session2) ;
+    }
+    
+    @Test
+    public void testXASessionRetry()
+        throws Exception
+    {
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+        try
+        {
+            final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+            MockConnectionInvocationHandler.throwFault = true ;
+            final JmsSession session ;
+            try
+            {
+                session = pool.getSession() ;
+                Assert.assertFalse("fault should have been thrown", MockConnectionInvocationHandler.throwFault) ;
+            }
+            finally
+            {
+                MockConnectionInvocationHandler.throwFault = false ;
+            }
+            
+            Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+            pool.closeSession(session) ;
+        }
+        finally
+        {
+            TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+        }
+    }
+    
+    @Test
+    public void testXASessionQueueBrowserRetry()
+        throws Exception
+    {
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+        try
+        {
+            final Queue queue = new MockQueue(QUEUE_NAME) ;
+            final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+            final JmsSession session = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+            
+            final QueueBrowser queueBrowser = session.createBrowser(queue) ;
+            queueBrowser.close() ;
+            
+            MockSessionInvocationHandler.throwFault = true ;
+            try
+            {
+                session.createBrowser(queue) ;
+                Assert.fail("Expected JmsConnectionFailureException") ;
+            }
+            catch (final JmsConnectionFailureException jmse) {} // expected
+            finally
+            {
+                MockSessionInvocationHandler.throwFault = false ;
+            }
+            
+            pool.closeSession(session) ;
+            
+            final JmsSession session2 = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+            Assert.assertNotSame("Session class", session, session2) ;
+        }
+        finally
+        {
+            TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+        }
+    }
+    
+    @Test
+    public void testXASessionMessageConsumerRetry()
+        throws Exception
+    {
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+        try
+        {
+            final Queue queue = new MockQueue(QUEUE_NAME) ;
+            final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+            final JmsSession session = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+            
+            final MessageConsumer messageConsumer = session.createConsumer(queue) ;
+            messageConsumer.close() ;
+            
+            MockSessionInvocationHandler.throwFault = true ;
+            try
+            {
+                session.createConsumer(queue) ;
+                Assert.fail("Expected JmsConnectionFailureException") ;
+            }
+            catch (final JmsConnectionFailureException jmse) {} // expected
+            finally
+            {
+                MockSessionInvocationHandler.throwFault = false ;
+            }
+            
+            pool.closeSession(session) ;
+            
+            final JmsSession session2 = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+            Assert.assertNotSame("Session class", session, session2) ;
+        }
+        finally
+        {
+            TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+        }
+    }
+    
+    @Test
+    public void testXASessionTopicSubscriberRetry()
+        throws Exception
+    {
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+        try
+        {
+            final Topic topic = new MockTopic(TOPIC_NAME) ;
+            final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+            final JmsSession session = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+            
+            final TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+            topicSubscriber.close() ;
+            
+            MockSessionInvocationHandler.throwFault = true ;
+            try
+            {
+                session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
+                Assert.fail("Expected JmsConnectionFailureException") ;
+            }
+            catch (final JmsConnectionFailureException jmse) {} // expected
+            finally
+            {
+                MockSessionInvocationHandler.throwFault = false ;
+            }
+            
+            pool.closeSession(session) ;
+            
+            final JmsSession session2 = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+            Assert.assertNotSame("Session class", session, session2) ;
+        }
+        finally
+        {
+            TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+        }
+    }
+    
+    @Test
+    public void testXASessionMessageProducerRetry()
+        throws Exception
+    {
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
+        try
+        {
+            final Queue queue = new MockQueue(QUEUE_NAME) ;
+            final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+            final JmsSession session = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
+            
+            final MessageProducer messageProducer = session.createProducer(queue) ;
+            messageProducer.close() ;
+            
+            MockSessionInvocationHandler.throwFault = true ;
+            try
+            {
+                session.createProducer(queue) ;
+                Assert.fail("Expected JmsConnectionFailureException") ;
+            }
+            catch (final JmsConnectionFailureException jmse) {} // expected
+            finally
+            {
+                MockSessionInvocationHandler.throwFault = false ;
+            }
+            
+            pool.closeSession(session) ;
+            
+            final JmsSession session2 = pool.getSession() ;
+            Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
+            Assert.assertNotSame("Session class", session, session2) ;
+        }
+        finally
+        {
+            TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+        }
+    }
+    
+    private Map<String, String> getPoolEnv()
+    {
+        final Map<String, String> env = new HashMap<String, String>() ;
+        env.put(JMSEpr.CONNECTION_FACTORY_TAG, CONNECTION_FACTORY);
+        return env ;
+    }
+    
+    static class MockXAConnectionFactory implements XAConnectionFactory, ConnectionFactory
+    {
+        public XAConnection createXAConnection()
+            throws JMSException
+        {
+            return (XAConnection)Proxy.newProxyInstance(MockXAConnectionFactory.class.getClassLoader(), new Class[] {XAConnection.class},
+                    new MockConnectionInvocationHandler()) ;
+        }
+
+        public XAConnection createXAConnection(final String user, final String password)
+            throws JMSException
+        {
+            return createXAConnection() ;
+        }
+
+        public Connection createConnection()
+            throws JMSException
+        {
+            return (Connection)Proxy.newProxyInstance(MockXAConnectionFactory.class.getClassLoader(), new Class[] {Connection.class},
+                    new MockConnectionInvocationHandler()) ;
+        }
+
+        public Connection createConnection(final String user, final String password)
+            throws JMSException
+        {
+            return createConnection() ;
+        }
+    }
+    
+    static final class MockConnectionInvocationHandler implements InvocationHandler
+    {
+        private ExceptionListener exceptionListener ;
+        static boolean throwFault ;
+            
+        MockConnectionInvocationHandler()
+        {
+        }
+            
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("setExceptionListener".equals(methodName))
+            {
+                exceptionListener = (ExceptionListener)args[0] ;
+                return null ;
+            }
+            else if ("getExceptionListener".equals(methodName))
+            {
+                return exceptionListener ;
+            }
+            else if ("createSession".equals(methodName))
+            {
+                checkFault() ;
+                final Integer acknowledgeMode = (Integer)args[1] ;
+                return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {Session.class},
+                        new MockSessionInvocationHandler(acknowledgeMode)) ;
+            }
+            else if ("createXASession".equals(methodName))
+            {
+                checkFault() ;
+                return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {XASession.class},
+                        new MockSessionInvocationHandler(Session.SESSION_TRANSACTED)) ;
+            }
+            else
+            {
+                System.out.println("Connection method " + method.getName() + " called") ;
+                return null ;
+            }
+        }
+        
+        void fireExceptionListener(final JMSException exception)
+        {
+            if (exceptionListener != null)
+            {
+                exceptionListener.onException(exception) ;
+            }
+        }
+        
+        private void checkFault()
+            throws JMSException
+        {
+            if (throwFault)
+            {
+                final JMSException exception = new JMSException("Test exception") ;
+                exception.initCause(new IllegalStateException("JMS IllegalStateException")) ;
+                // clear it down to allow retry
+                throwFault = false ;
+                throw exception ;
+            }
+        }
+    }
+    
+    static final class MockSessionInvocationHandler implements InvocationHandler
+    {
+        private final Integer acknowledgeMode ;
+        static boolean throwFault ;
+            
+        MockSessionInvocationHandler(final Integer acknowledgeMode)
+        {
+            this.acknowledgeMode = acknowledgeMode ;
+        }
+            
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("getAcknowledgeMode".equals(methodName))
+            {
+                return acknowledgeMode ;
+            }
+            else if ("createBrowser".equals(methodName))
+            {
+                checkFault() ;
+                return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {QueueBrowser.class},
+                        new MockNullInvocationHandler()) ;
+            }
+            else if ("createConsumer".equals(methodName))
+            {
+                checkFault() ;
+                return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageConsumer.class},
+                        new MockNullInvocationHandler()) ;
+            }
+            else if ("createDurableSubscriber".equals(methodName))
+            {
+                checkFault() ;
+                return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {TopicSubscriber.class},
+                        new MockNullInvocationHandler()) ;
+            }
+            else if ("createProducer".equals(methodName))
+            {
+                checkFault() ;
+                return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageProducer.class},
+                        new MockNullInvocationHandler()) ;
+            }
+            else
+            {
+                System.out.println("Session method " + method.getName() + " called") ;
+                return null ;
+            }
+        }
+        
+        private void checkFault()
+            throws JMSException
+        {
+            if (throwFault)
+            {
+                final JMSException exception = new JMSException("Test exception") ;
+                exception.initCause(new IllegalStateException("JMS IllegalStateException")) ;
+                throw exception ;
+            }
+        }
+    }
+    
+    static final class MockNullInvocationHandler implements InvocationHandler
+    {
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("hashCode".equals(methodName))
+            {
+                return System.identityHashCode(proxy) ;
+            }
+            else if ("equals".equals(methodName))
+            {
+                return proxy == args[0] ;
+            }
+            else
+            {
+                return null ;
+            }
+        }
+    }
+    
+    private static final class MockActiveTransactionStrategy extends NullTransactionStrategy
+    {
+        @Override
+        public boolean isActive()
+            throws TransactionStrategyException
+        {
+            return true ;
+        }
+        
+        @Override
+        public void registerSynchronization(final Synchronization sync)
+                throws TransactionStrategyException
+        {
+        }
+        
+        @Override
+        public void enlistResource(final XAResource resource)
+                throws TransactionStrategyException
+        {
+        }
+        
+        @Override
+        public Object getTransaction()
+            throws TransactionStrategyException
+        {
+            return this ;
+        }
+    }
+    
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(JmsConnectionPoolUnitTest.class);
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list