[jboss-svn-commits] JBL Code SVN: r26301 - in labs/jbossesb/workspace: blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Apr 29 06:15:00 EDT 2009


Author: kevin.conner at jboss.com
Date: 2009-04-29 06:14:59 -0400 (Wed, 29 Apr 2009)
New Revision: 26301

Added:
   labs/jbossesb/workspace/blixen/
   labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
Modified:
   labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
   labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
   labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
   labs/jbossesb/workspace/blixen/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
Log:
Added for blixen

Copied: labs/jbossesb/workspace/blixen (from rev 23452, labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA)

Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -23,12 +23,12 @@
 package org.jboss.internal.soa.esb.couriers;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Properties;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -41,6 +41,7 @@
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.couriers.helpers.JmsComposer;
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionFailureException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
@@ -176,9 +177,30 @@
      *                          if problems were encountered
      */
     public boolean deliver(org.jboss.soa.esb.message.Message message) throws CourierException {
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            // fall through
+        } catch (final IllegalStateException ise) {
+            // fall through
+        }
+        
+        cleanup() ;
+        
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+        } catch (final IllegalStateException ise) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+        }
+    }
+    
+    private boolean internalDeliver(org.jboss.soa.esb.message.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
         ObjectMessage msg;
 
         if (null == message) {
+            _logger.warn("KEV: null message sent into ESB deliver") ;
             return false;
         }
         
@@ -206,11 +228,15 @@
         // Set the JMS message from the ESB message...
         try {
             setJMSProperties(message, msg);
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw jcfe ;
+        } catch (final IllegalStateException ise) {
+            throw ise ;
         } catch (JMSException e) {
             throw new CourierMarshalUnmarshalException("Failed to set JMS Message properties from ESB Message properties.", e);
         }
 
-        return deliver(msg);
+        return internalDeliver(msg);
     }
 
     /**
@@ -222,11 +248,32 @@
      *                          if problems were encountered
      */
     public boolean deliver(javax.jms.Message message) throws CourierException {
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            // fall through
+        } catch (final IllegalStateException ise) {
+            // fall through
+        }
+        
+        cleanup() ;
+        
+        try {
+            return internalDeliver(message) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", jcfe);
+        } catch (final IllegalStateException ise) {
+            throw new CourierTransportException("Caught exception during delivery and could not reconnect! ", ise);
+        }
+    }
+    
+    private boolean internalDeliver(javax.jms.Message message) throws CourierException, JmsConnectionFailureException, IllegalStateException {
         if (_isReceiver) {
             throw new CourierException("This is a read-only Courier");
         }
 
         if (null == message) {
+            _logger.warn("KEV: null message sent into JMS deliver") ;
             return false;
         }
         synchronized(this) {
@@ -253,6 +300,12 @@
                     
                     return true;
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException e) {
                     if (!jmsConnectRetry(e))
                         throw new CourierTransportException("Caught exception during delivery and could not reconnect! ",e);
@@ -312,7 +365,7 @@
         return true;
     } // ________________________________
 
-    private void createMessageProducer() throws CourierException, NamingContextException {
+    private void createMessageProducer() throws CourierException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
         synchronized(this) {
             if (_messageProducer == null) {
                 try {
@@ -354,6 +407,12 @@
                         }
                     }
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException ex) {
                     _logger.debug("Error from JMS system.", ex);
 
@@ -389,6 +448,26 @@
     }
 
     public javax.jms.Message pickupPayload(long millis) throws CourierException, CourierTimeoutException {
+        try {
+            return internalPickupPayload(millis) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            // fall through
+        } catch (final IllegalStateException ise) {
+            // fall through
+        }
+        
+        cleanup() ;
+        
+        try {
+            return internalPickupPayload(millis) ;
+        } catch (final JmsConnectionFailureException jcfe) {
+            throw new CourierTransportException("Caught exception during receive and could not reconnect! ", jcfe);
+        } catch (final IllegalStateException ise) {
+            throw new CourierTransportException("Caught exception during receive and could not reconnect! ", ise);
+        }
+    }
+    
+    private javax.jms.Message internalPickupPayload(long millis) throws CourierException, CourierTimeoutException, JmsConnectionFailureException, IllegalStateException {
         if (!_isReceiver)
             throw new CourierException("This is an outgoing-only Courier");
         if (millis < 1)
@@ -414,6 +493,12 @@
                     jmsMessage = _messageConsumer.receive(millis);
                     break;
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException e) {
                     if (!jmsConnectRetry(e))
                         throw new CourierTransportException("Caught exception during receive and could not reconnect! ",e);
@@ -467,7 +552,7 @@
         esbPropertiesStrategy.setPropertiesFromJMSMessage(fromJMS, toESB);
     }
 
-    private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException {
+    private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException, JmsConnectionFailureException, IllegalStateException {
         Context oJndiCtx = null;
 
         synchronized(this) {
@@ -513,6 +598,12 @@
                         NamingContextPool.releaseNamingContext(oJndiCtx) ;
                     }
                 }
+                catch (final JmsConnectionFailureException jcfe) {
+                    throw jcfe ;
+                }
+                catch (final IllegalStateException ise) {
+                    throw ise ;
+                }
                 catch (JMSException ex) {
                     _logger.debug("Error from JMS system.", ex);
 

Added: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
===================================================================
--- labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java	                        (rev 0)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -0,0 +1,59 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.jms.JMSException;
+
+/**
+ * This exception is thrown from a JMS session/producer/consumer method when the
+ * underlying exception appears to have been closed.  This will occur when the connection
+ * has not been closed through the standard JMS exception listener route.
+ * 
+ * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+ */
+public class JmsConnectionFailureException extends JMSException
+{
+    /**
+     * The serial version UID of this exception
+     */
+    private static final long serialVersionUID = 4352490530798177236L;
+
+    /**
+     * Construct an jms connection failure exception with the specified message.
+     * @param message The exception message.
+     */
+    public JmsConnectionFailureException(final String message) 
+    {
+        super(message) ;
+    }
+
+    /**
+     * Construct an jms connection failure exception with the specified message and associated cause.
+     * @param message The exception message.
+     * @param cause The associated cause.
+     */
+    public JmsConnectionFailureException(String message, Throwable cause)
+    {
+        this(message) ;
+        initCause(cause) ;
+    }
+}


Property changes on: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionFailureException.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -161,15 +161,17 @@
     private  synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
         throws JMSException
     {
+        final long currentID = id ;
+        final Connection currentConnection = jmsConnection ;
         final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
             public JmsSession call()
                 throws JMSException
             {
                 final JmsSession session ;
                 if (transacted) {
-                    session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+                    session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)currentConnection).createXASession(), currentID);
                 } else {
-                    session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+                    session = new JmsSession(JmsConnectionPool.this, currentConnection.createSession(transacted, acknowledgeMode), currentID);
                 }
                 return session ;
             }
@@ -210,6 +212,24 @@
      */
     public synchronized JmsSession getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
     {
+        try
+        {
+            return internalGetSession(acknowledgeMode) ;
+        }
+        catch (final JMSException jmse)
+        {
+            if (messagingConnectionFailure(jmse))
+            {
+                cleanSessionPool() ;
+                return internalGetSession(acknowledgeMode) ;
+            }
+            throw jmse ;
+        }
+    }
+
+    private synchronized JmsSession internalGetSession(final int acknowledgeMode)
+        throws NamingException, JMSException, ConnectionException
+    {
         try {
             initConnection() ;
         } catch (final NamingContextException nce) {
@@ -309,23 +329,36 @@
      */
     synchronized void handleCloseSession(final JmsSession session)
     {
-        if (session.getId() == id)
+        if (session.isSuspect())
         {
-            final int mode ;
-            try {
-                mode = session.getAcknowledgeMode() ;
-            } catch (final JMSException jmse) {
-                logger.warn("JMSException while calling getAcknowledgeMode") ;
-                logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
-                return ;
+            logger.debug("Session is suspect, dropping") ;
+            handleReleaseSession(session) ;
+        }
+        else
+        {
+            if (session.getId() != id)
+            {
+                logger.debug("Session is from a previous incarnation, dropping") ;
             }
-            
-            final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
-            if (sessions != null) {
-                sessions.add(session) ;
+            else
+            {
+                final int mode ;
+                try {
+                    mode = session.getAcknowledgeMode() ;
+                } catch (final JMSException jmse) {
+                    logger.warn("JMSException while calling getAcknowledgeMode") ;
+                    logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+                    return ;
+                }
+                
+                final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+                if (sessions != null) {
+                    sessions.add(session) ;
+                }
             }
+            session.releaseResources() ;
+            releaseInUseSession(session) ;
         }
-        handleReleaseSession(session) ;
     }
     
     /**
@@ -334,7 +367,21 @@
      */
     synchronized void handleReleaseSession(final JmsSession session)
     {
-        session.releaseResources() ;
+        session.releaseResources();
+        try
+        {
+            session.close() ;
+        }
+        catch (final Throwable th) {} // ignore
+        releaseInUseSession(session) ;
+    }
+    
+    /**
+     * Release a session from the in use mapping.
+     * @param session The session to release.
+     */
+    private void releaseInUseSession(final JmsSession session)
+    {
         final int mode ;
         try {
             mode = session.getAcknowledgeMode() ;
@@ -564,6 +611,43 @@
     }
     
     /**
+     * Check the exception to see whether it indicates a connection failure in JBM
+     * @param jmse The current JMS Exception
+     * @return true if it suggests a failure, false otherwise.
+     */
+    private boolean messagingConnectionFailure(final JMSException jmse)
+    {
+        /*
+         * JBoss Messaging can drop the connection from the server side
+         * without calling back on the exception listener.  We check for
+         * IllegalStateException as this appears to be the indicator
+         * exception used by JBoss Messaging when the connection has disappeared.
+         */
+        Throwable cause = jmse ;
+        while(cause.getCause() != null)
+        {
+            cause = cause.getCause() ;
+        }
+        return (cause instanceof IllegalStateException) ;
+    }
+
+    /**
+     * Handle exceptions that occur in the session or its child objects.
+     * @param jmsSession The jmsSession associated with the call.
+     * @param jmse The JMSException to check.
+     * @throws JMSException if the exception is to be overridden.
+     */
+    void handleException(final JmsSession jmsSession, final JMSException jmse)
+        throws JMSException
+    {
+        if (messagingConnectionFailure(jmse))
+        {
+            cleanSessionPool() ;
+            throw new JmsConnectionFailureException("The underlying exception appears to have failed", jmse) ;
+        }
+    }
+    
+    /**
      * Associate the JMS XA Session with the current transaction.
      * @param session The XA session.
      * @throws ConnectionException if there is no transaction active.

Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -22,6 +22,10 @@
 package org.jboss.internal.soa.esb.rosetta.pooling;
 
 import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.HashSet;
 
 import javax.jms.BytesMessage;
@@ -49,6 +53,10 @@
 public class JmsSession implements Session
 {
     /**
+     * The connection pool.
+     */
+    private final JmsConnectionPool pool ;
+    /**
      * The session delegate.
      */
     private final Session session ;
@@ -56,6 +64,14 @@
      * The pool instance id.
      */
     private final long id ;
+    /**
+     * The session acknowledge mode.
+     */
+    private final int acknowledgeMode ;
+    /**
+     * Flag indicating whether this session is suspect or not.
+     */
+    private boolean suspect ;
     
     /**
      * The set of active queue browsers.
@@ -72,16 +88,19 @@
     
     /**
      * Create the session wrapper.
+     * @param pool The pool associated with this session.
      * @param session The session delegate.
      * @param id The pool instance id.
      * @param isJTA True if this tales part in a JTA transaction
      * @throws JMSException
      */
-    JmsSession(final Session session, final long id)
+    JmsSession(final JmsConnectionPool pool, final Session session, final long id)
         throws JMSException
     {
         this.id = id ;
-        this.session = session ;
+        this.pool = pool ;
+        this.session = (Session)getExceptionHandler(pool, Session.class, session) ;
+        acknowledgeMode = session.getAcknowledgeMode() ;
         // Workaround for JBESB-1873
         if ("org.jboss.jms.client.JBossSession".equals(session.getClass().getName()))
         {
@@ -101,7 +120,9 @@
 
     public void commit() throws JMSException
     {
+        setSuspect(true) ;
         session.commit();
+        setSuspect(false) ;
     }
 
     public QueueBrowser createBrowser(Queue arg0, String arg1)
@@ -224,8 +245,7 @@
 
     public int getAcknowledgeMode() throws JMSException
     {
-        associate() ;
-        return session.getAcknowledgeMode();
+        return acknowledgeMode;
     }
 
     public MessageListener getMessageListener() throws JMSException
@@ -248,7 +268,9 @@
 
     public void rollback() throws JMSException
     {
+        setSuspect(true) ;
         session.rollback();
+        setSuspect(false) ;
     }
 
     public void run()
@@ -278,7 +300,7 @@
         }
         final QueueBrowser result = getQueueBrowser(queueBrowser) ;
         queueBrowserSet.add(result) ;
-        return result ;
+        return (QueueBrowser)getExceptionHandler(pool, QueueBrowser.class, result) ;
     }
 
     private synchronized MessageConsumer trackMessageConsumer(MessageConsumer messageConsumer)
@@ -291,7 +313,7 @@
         }
         final MessageConsumer result = getMessageConsumer(messageConsumer) ;
         messageConsumerSet.add(result) ;
-        return result ;
+        return (MessageConsumer)getExceptionHandler(pool, MessageConsumer.class, result) ;
     }
 
     private synchronized TopicSubscriber trackTopicSubscriber(TopicSubscriber topicSubscriber)
@@ -304,7 +326,7 @@
         }
         final TopicSubscriber result = getTopicSubscriber(topicSubscriber) ;
         messageConsumerSet.add(result) ;
-        return result ;
+        return (TopicSubscriber)getExceptionHandler(pool, TopicSubscriber.class, result) ;
     }
 
     private synchronized MessageProducer trackMessageProducer(MessageProducer messageProducer)
@@ -317,7 +339,7 @@
         }
         final MessageProducer result = getMessageProducer(messageProducer) ;
         messageProducerSet.add(result) ;
-        return result ;
+        return (MessageProducer)getExceptionHandler(pool, MessageProducer.class, result) ;
     }
 
     synchronized void releaseResources()
@@ -352,11 +374,6 @@
             }
             messageProducerSet = null ;
         }
-        try
-        {
-            recover() ;
-        }
-        catch (final JMSException jmse) {} // ignore
     }
 
     protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
@@ -393,4 +410,71 @@
         throws JMSException
     {
     }
+    
+    protected void setSuspect(final boolean suspect)
+    {
+        this.suspect = suspect ;
+    }
+    
+    public boolean isSuspect()
+    {
+        return suspect ;
+    }
+
+    /**
+     * Wrap the object in an exception handler.
+     * @param pool The current connection pool.
+     * @param instanceClass The interface type of the instance.
+     * @param instance The instance
+     * @return 
+     */
+    protected Object getExceptionHandler(final JmsConnectionPool pool,
+            final Class<?> instanceClass, final Object instance)
+    {
+        final InvocationHandler handler = new ExceptionHandler(pool, instance) ;
+        return Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {instanceClass}, handler);
+    }
+    
+    /**
+     * Handler responsible for intercepting JMS exceptions and checking for unreported closure.
+     * @author <a href='mailto:kevin.conner at jboss.com'>Kevin Conner</a>
+     */
+    private final class ExceptionHandler implements InvocationHandler
+    {
+        /**
+         * The connection pool.
+         */
+        private final JmsConnectionPool pool ;
+        /**
+         * The target instance.
+         */
+        private final Object target ;
+        
+        /**
+         * Construct the handler using the specified target.
+         * @param target The target instance.
+         */
+        public ExceptionHandler(final JmsConnectionPool pool, final Object target)
+        {
+            this.pool = pool ;
+            this.target = target ;
+        }
+        
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+        {
+            try
+            {
+                return method.invoke(target, args);
+            }
+            catch (final InvocationTargetException ite)
+            {
+                final Throwable th = ite.getCause() ;
+                if (th instanceof JMSException)
+                {
+                    pool.handleException(JmsSession.this, (JMSException)th) ;
+                }
+                throw th ;
+            }
+        }
+    }
 }

Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -32,6 +32,7 @@
 import javax.jms.QueueBrowser;
 import javax.jms.TopicSubscriber;
 import javax.jms.XASession;
+import javax.transaction.Status;
 import javax.transaction.Synchronization;
 import javax.transaction.xa.XAResource;
 
@@ -78,11 +79,11 @@
     JmsXASession(final JmsConnectionPool pool, final XASession session, final long id)
         throws JMSException
     {
-        super(session, id) ;
+        super(pool, session, id) ;
         this.pool = pool ;
-        this.session = session ;
+        this.session = (XASession)getExceptionHandler(pool, XASession.class, session) ;
     }
-    
+
     @Override
     public void commit() throws JMSException
     {
@@ -162,11 +163,12 @@
         if (!associated)
         {
             cleanupAction = Cleanup.none ;
-            final XAResource resource = session.getXAResource() ;
             final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
             try
             {
                 transactionStrategy.registerSynchronization(this) ;
+                setSuspect(true) ;
+                final XAResource resource = session.getXAResource() ;
                 transactionStrategy.enlistResource(resource) ;
             }
             catch (final TransactionStrategyException tse)
@@ -186,6 +188,7 @@
                 throw ex ;
             }
             
+            setSuspect(false) ;
             associated = true ;
         }
     }
@@ -200,8 +203,12 @@
         switch (cleanupAction)
         {
         case close:
-            pool.handleCloseSession(this) ;
-            break ;
+            if (result == Status.STATUS_COMMITTED)
+            {
+                pool.handleCloseSession(this) ;
+                break ;
+            }
+            // fall through
         case release:
             pool.handleReleaseSession(this) ;
             break ;

Modified: labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -571,6 +571,7 @@
                 } catch (FaultMessageException e) {
                     throw e;
                 } catch (final CourierServiceBindException e) {
+                    logger.warn("KEV: Caught exception", e) ;
                     // meant to be masked by the SI fail-over
 
                     logger.debug("Caught service lookup exception for EPR [" + targetEPR + "] and Service [" + service + "] and Message ["+message.getHeader()+"]. " + e.getMessage());
@@ -581,6 +582,7 @@
 
                     throw new MessageDeliverException("Caught (un)marshal related exception during attempted send/receive.", e);
                 } catch (final CourierTransportException e) {
+                    logger.warn("KEV: Caught exception", e) ;
                     // meant to be masked by the SI fail-over
 
                     logger.debug("Courier indicated transport related error "+e+" during send/receive with EPR [" + targetEPR + "] for Service [" + service + "] and Message ["+message.getHeader()+"]. " + e.getMessage());

Modified: labs/jbossesb/workspace/blixen/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_4_SOA_4_3_GA/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java	2008-10-14 09:28:56 UTC (rev 23452)
+++ labs/jbossesb/workspace/blixen/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java	2009-04-29 10:14:59 UTC (rev 26301)
@@ -23,19 +23,32 @@
 
 import static org.junit.Assert.assertEquals;
 
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Properties;
 
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
 import javax.jms.Session;
 import javax.naming.Context;
+import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
 
 import junit.framework.JUnit4TestAdapter;
 
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
-import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.helpers.NamingContextPool;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockejb.jms.QueueConnectionFactoryImpl;
+import org.mockejb.jndi.MockContextFactory;
 
 /**
  * @author kstam
@@ -43,21 +56,42 @@
  *
  */
 public class JmsConnectionPoolingIntegrationTest {
-	
-	private Properties environment;
-	
-	@Before
-	public void setup()
-	{
-		environment = getEnvironment();
-	}
     
+    private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
+    
+    private static final String BROKEN_CONNECTION_FACTORY = "BrokenConnectionFactory" ;
+    
+    private MockTransactionStrategy transactionStrategy = new MockTransactionStrategy() ;
+    
+    @Before
+    public void setup() throws Exception
+    {
+        MockContextFactory.setAsInitial() ;
+        final Context ctx = NamingContextPool.getNamingContext(null);
+        try
+        {
+            ctx.rebind(CONNECTION_FACTORY, new MockQueueConnectionFactory());
+        }
+        finally
+        {
+            NamingContextPool.releaseNamingContext(ctx) ;
+        }
+        TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+    }
+    
+    @After
+    public void tearDown()
+    {
+        TransactionStrategy.setTransactionStrategy(null) ;
+        MockContextFactory.revertSetAsInitial() ;
+    }
+    
     @Test
     public void testPoolAndConnectionCreation()  throws Exception
     {
         JmsConnectionPool jmsConnectionPool = null;
         
-        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         //Open 3 concurrent sessions
         JmsSession session1 = jmsConnectionPool.getSession();
@@ -76,7 +110,7 @@
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
         
-        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
         jmsConnectionPool.getSession();
         assertEquals(1, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
@@ -88,12 +122,14 @@
     @Test
     public void testCreateSecondPool() throws Exception
     {
-        JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
-        jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
+        JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
+        jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
         //This should be the same pool
         assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
-    
-        JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(null, "ConnectionFactory");
+        
+        final Properties environment = new Properties() ;
+        environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, MockContextFactory.class.getName());
+        JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(environment, CONNECTION_FACTORY);
         //This should be a different pool, so now we should have 2.
         assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
         
@@ -108,7 +144,7 @@
     @Test
     public void testPoolAndSessionsWithAcknowledgeMode()  throws Exception
     {
-        JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+        JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         
         JmsSession autoAckSession1 = jmsConnectionPool.getSession(Session.AUTO_ACKNOWLEDGE);
@@ -176,7 +212,7 @@
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
         
-        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, CONNECTION_FACTORY);
         jmsConnectionPool.getSession();
         assertEquals(1, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
@@ -184,17 +220,121 @@
         jmsConnectionPool.removeSessionPool();
     }
     
-    public Properties getEnvironment()
+    public static junit.framework.Test suite()
     {
-    	  Properties environment = new Properties();
-          environment.setProperty(Context.PROVIDER_URL, Environment.JBOSS_PROVIDER_URL);
-          environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, Environment.JBOSS_INITIAL_CONTEXT_FACTORY);
-          environment.setProperty(Context.URL_PKG_PREFIXES, Environment.JBOSS_URL_PKG_PREFIX);
-          return environment;
+        return new JUnit4TestAdapter(JmsConnectionPoolingIntegrationTest.class);
     }
     
-    public static junit.framework.Test suite()
+    private static final class MockQueueConnectionFactory extends QueueConnectionFactoryImpl
     {
-        return new JUnit4TestAdapter(JmsConnectionPoolingIntegrationTest.class);
+        @Override
+        public QueueConnection createQueueConnection() throws JMSException
+        {
+            return (QueueConnection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueConnection.class},
+                    new MockQueueExceptionHandlerInvocationHandler(super.createQueueConnection())) ;
+        }
     }
+
+    private static final class MockQueueExceptionHandlerInvocationHandler implements InvocationHandler
+    {
+        private final QueueConnection queueConnection ;
+        private ExceptionListener exceptionListener ;
+
+        MockQueueExceptionHandlerInvocationHandler(final QueueConnection queueConnection)
+        {
+            this.queueConnection = queueConnection ;
+        }
+
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("setExceptionListener".equals(methodName))
+            {
+                exceptionListener = (ExceptionListener)args[0] ;
+                return null ;
+            }
+            else if ("getExceptionListener".equals(methodName))
+            {
+                return exceptionListener ;
+            }
+            else
+            {
+                final Object response = method.invoke(queueConnection, args) ;
+//                if (response instanceof QueueSession)
+//                {
+//                    final QueueSession queueSession = (QueueSession)response ;
+//                    return (QueueSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueSession.class},
+//                            new MockQueueSessionInvocationHandler(queueSession)) ;
+//                }
+//                else
+//                {
+                    return response ;
+//                }
+            }
+        }
+    }
+    
+    private static final class MockTransactionStrategy extends TransactionStrategy
+    {
+        private final Object tx = new Object() ;
+        public boolean active ;
+        
+        @Override
+        public void begin()
+            throws TransactionStrategyException
+        {
+        }
+
+        @Override
+        public void enlistResource(final XAResource resource)
+                throws TransactionStrategyException
+        {
+        }
+
+        @Override
+        public Object getTransaction()
+            throws TransactionStrategyException
+        {
+            return tx;
+        }
+
+        @Override
+        public boolean isActive()
+            throws TransactionStrategyException
+        {
+            return active;
+        }
+
+        @Override
+        public void registerSynchronization(final Synchronization sync)
+                throws TransactionStrategyException
+        {
+        }
+
+        @Override
+        public void resume(final Object tx)
+            throws TransactionStrategyException
+        {
+        }
+
+        @Override
+        public void rollbackOnly()
+            throws TransactionStrategyException
+        {
+        }
+
+        @Override
+        public Object suspend()
+            throws TransactionStrategyException
+        {
+            return null;
+        }
+
+        @Override
+        public void terminate()
+            throws TransactionStrategyException
+        {
+        }
+    }
 }




More information about the jboss-svn-commits mailing list