[jboss-svn-commits] JBL Code SVN: r19172 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling and 12 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Mar 21 08:27:48 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-03-21 08:27:47 -0400 (Fri, 21 Mar 2008)
New Revision: 19172

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
Removed:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/xa/
Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
   labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
   labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
   labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
Log:
Fix JMS/TX integration: JBESB-1618

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -32,11 +32,8 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
-import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
 import javax.naming.Context;
 import javax.naming.NamingException;
 import javax.xml.parsers.ParserConfigurationException;
@@ -46,13 +43,10 @@
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
-import org.jboss.internal.soa.esb.rosetta.pooling.xa.XaJmsConnectionPool;
-import org.jboss.internal.soa.esb.rosetta.pooling.xa.XaJmsConnectionPoolContainer;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -125,57 +119,46 @@
         }
     } // ________________________________
 
-    private void closeSession() {
-        synchronized(this) {
-            if (jmsSession != null) {
-                try {
-                    getConnectionPool().closeSession(jmsSession);
-                } catch (ConnectionException e) {
-                    _logger.error("Unable to get Connection Poll for closing of JMS Session.", e);
-                } finally {
-                    jmsSession = null;
-                }
+    private synchronized void closeSession() {
+        if (jmsSession != null) {
+            try {
+                getConnectionPool().closeSession(jmsSession);
+            } catch (ConnectionException e) {
+                _logger.error("Unable to get Connection Pool for closing of JMS Session.", e);
+            } finally {
+                jmsSession = null;
             }
         }
     }
+
+    private synchronized void releaseSession() {
+        if (jmsSession != null) {
+            try {
+                getConnectionPool().releaseSession(jmsSession);
+            } catch (ConnectionException e) {
+                _logger.error("Unable to get Connection Pool for releasing of JMS Session.", e);
+            } finally {
+                jmsSession = null;
+            }
+        }
+    }
     
     public Session getJmsSession() throws CourierException {
     	return getJmsSession(Session.AUTO_ACKNOWLEDGE);
     }
 
-    public Session getJmsSession(final int acknowledgeMode) throws CourierException {
-	reset();
-	
+    public synchronized Session getJmsSession(final int acknowledgeMode) throws CourierException {
         if(jmsSession == null) {
-            synchronized(this) {
-        	checkTransaction();
-        	
-                if(jmsSession == null) {
-                    String sType;
-
-                    try {
-                        sType = _epr.getDestinationType();
-                    } catch (URISyntaxException e) {
-                        throw new CourierException("EPR.getDestinationType failed.", e);
-                    }
-
-                    try {
-                        if (JMSEpr.QUEUE_TYPE.equals(sType)) {
-                            jmsSession = getConnectionPool().getQueueSession(acknowledgeMode);
-                        } else {
-                            jmsSession = getConnectionPool().getTopicSession(acknowledgeMode);
-                        }
-                    } catch (NamingException e) {
-                        throw new CourierException("Failed to get JMS Session from pool.", e);
-                    } catch (JMSException e) {
-                        throw new CourierException("Failed to get JMS Session from pool.", e);
-                    } catch (ConnectionException e) {
-                        throw new CourierException("Failed to get JMS Session from pool.", e);
-                    }
-                }
+            try {
+                jmsSession = getConnectionPool().getSession(acknowledgeMode);
+            } catch (NamingException e) {
+                throw new CourierException("Failed to get JMS Session from pool.", e);
+            } catch (JMSException e) {
+                throw new CourierException("Failed to get JMS Session from pool.", e);
+            } catch (ConnectionException e) {
+                throw new CourierException("Failed to get JMS Session from pool.", e);
             }
         }
-
         return jmsSession;
     }
 
@@ -194,11 +177,13 @@
             return false;
         }
         
-        if (_messageProducer == null) {
-            try {
-                createMessageProducer();
-            } catch (final NamingContextException nce) {
-                throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+        synchronized(this) {
+            if (_messageProducer == null) {
+                try {
+                    createMessageProducer();
+                } catch (final NamingContextException nce) {
+                    throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+                }
             }
         }
 
@@ -239,66 +224,58 @@
         if (null == message) {
             return false;
         }
-	
-	reset();
-	
-        if (_messageProducer == null) {
-            try {
-                createMessageProducer();
-            } catch (final NamingContextException nce) {
-                throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+        synchronized(this) {
+            if (_messageProducer == null) {
+                try {
+                    createMessageProducer();
+                } catch (final NamingContextException nce) {
+                    throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+                }
             }
         }
 
-        while (null != _messageProducer) {
-            try {
-                for (KeyValuePair kvp : _messageProperties) {
-                    String key = kvp.getKey();
-                    if(message.getStringProperty(key) == null) {
-                        message.setStringProperty(key, kvp.getValue());
+        synchronized(this) {
+            while (_messageProducer != null) {
+                try {
+                    for (KeyValuePair kvp : _messageProperties) {
+                        String key = kvp.getKey();
+                        if(message.getStringProperty(key) == null) {
+                            message.setStringProperty(key, kvp.getValue());
+                        }
                     }
+                    
+                    _messageProducer.send(message);
+                    
+                    return true;
                 }
-
-                checkTransaction();
-                
-                sendMessage(message);
-                if ( jmsSession.getTransacted() && !transactional )
-	                jmsSession.commit();
-                
-                return true;
+                catch (JMSException e) {
+                    if (!jmsConnectRetry(e))
+                        throw new CourierException("Caught exception during delivery and could not reconnect! ",e);
+                }
+                catch (Exception e) {
+                    throw new CourierException(e);
+                }
             }
-            catch (JMSException e) {
-                jmsConnectRetry(e);
-            }
-            catch (Exception e) {
-                throw new CourierException(e);
-            }
         }
         return false;
     } // ________________________________
 
 
-    /**
-     * send/publish a javax.jms.ObjectMessage (that will contain the serialized
-     * ESB Message)
-     *
-     * @param jmsMessage
-     */
-    private void sendMessage(javax.jms.Message jmsMessage) throws JMSException, URISyntaxException {
-        String sType = _epr.getDestinationType();
-        if (JMSEpr.TOPIC_TYPE.equals(sType)) {
-            ((TopicPublisher) _messageProducer).publish(jmsMessage);
-        } else {
-            _messageProducer.send(jmsMessage);
-        }
-
-    } // ________________________________
-
-    private void jmsConnectRetry(Exception exc) {
+    private boolean jmsConnectRetry(Exception exc) {
         _logger.debug("JMS error.  Attempting JMS reconnect.", exc);
 
         synchronized(this) {
-            cleanup();
+            try {
+                if (jmsSession.getTransacted()) {
+                    jmsSession.rollback() ;
+                    return false ;
+                } else {
+                    cleanup() ;
+                }
+            } catch (final JMSException jmse) {
+                releaseSession() ;
+                return false ;
+            }
 
             final int maxRetry = 5; // TODO Magic number here!!!
             for (int i1 = 0; i1 < maxRetry; i1++) {
@@ -320,195 +297,104 @@
                         }
                     } else {
                         _logger.debug("Failed to reconnect to JMS", e);
+                        
+                        return false ;
                     }
                 }
             }
         }
+        
+        return true ;
     } // ________________________________
 
     private void createMessageProducer() throws CourierException, NamingContextException {
         Context oJndiCtx = null;
-
-        reset();
-	
-        checkTransaction();
         
-        if (_messageProducer == null) {
-            synchronized(this) {
-                if (_messageProducer == null) {
-                    try {
-                        oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
+        synchronized(this) {
+            if (_messageProducer == null) {
+                try {
+                    oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
 
-                        String sType = _epr.getDestinationType();
-                        if (JMSEpr.QUEUE_TYPE.equals(sType)) {
-                            QueueSession qSess = (QueueSession) getJmsSession(_epr.getAcknowledgeMode());
-                            javax.jms.Queue queue = null;
+                    String sType = _epr.getDestinationType();
+                    if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+                        Session qSess = getJmsSession(_epr.getAcknowledgeMode());
+                        javax.jms.Queue queue = null;
+                        try {
+                            queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
+                                    .getDestinationName());
+                        } catch (NamingException ne) {
                             try {
+                                oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
                                 queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
                                         .getDestinationName());
-                            } catch (NamingException ne) {
-                                try {
-                                    oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
-                                    queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
-                                            .getDestinationName());
-                                } catch (NamingException nex) {
-                                    //ActiveMQ
-                                    queue = qSess.createQueue(_epr.getDestinationName());
-                                }
+                            } catch (NamingException nex) {
+                                //ActiveMQ
+                                queue = qSess.createQueue(_epr.getDestinationName());
                             }
-                            _messageProducer = qSess.createSender(queue);
-                        } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
-                            TopicSession tSess = (TopicSession) getJmsSession(_epr.getAcknowledgeMode());
-                            Topic topic = null;
-                            try {
-                                topic = (Topic) oJndiCtx.lookup(_epr
-                                        .getDestinationName());
-                            }
-                            catch (NamingException ne) {
-                                topic = tSess.createTopic(_epr.getDestinationName());
-                            }
-                            _messageProducer = tSess.createPublisher(topic);
-                        } else {
-                            throw new CourierException("Unknown destination type");
                         }
-                        _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
-                        if ( _logger.isDebugEnabled() )
-                            _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
+                        _messageProducer = qSess.createProducer(queue);
+                    } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+                        Session tSess = getJmsSession(_epr.getAcknowledgeMode());
+                        Topic topic = null;
+                        try {
+                            topic = (Topic) oJndiCtx.lookup(_epr
+                                    .getDestinationName());
+                        }
+                        catch (NamingException ne) {
+                            topic = tSess.createTopic(_epr.getDestinationName());
+                        }
+                        _messageProducer = tSess.createProducer(topic);
+                    } else {
+                        throw new CourierException("Unknown destination type");
                     }
-                    catch (JMSException ex) {
-                        _logger.debug("Error from JMS system.", ex);
+                    _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
+                    if ( _logger.isDebugEnabled() )
+                        _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
+                }
+                catch (JMSException ex) {
+                    _logger.debug("Error from JMS system.", ex);
 
-                        throw new CourierException(ex);
+                    throw new CourierException(ex);
+                }
+                catch (URISyntaxException ex) {
+                    throw new CourierException(ex);
+                } finally {
+                    if (oJndiCtx != null) {
+                        NamingContextPool.releaseNamingContext(oJndiCtx) ;
                     }
-                    catch (URISyntaxException ex) {
-                        throw new CourierException(ex);
-                    } finally {
-                        if (oJndiCtx != null) {
-                            NamingContextPool.releaseNamingContext(oJndiCtx) ;
-                        }
-                    }
                 }
             }
         }
     } // ________________________________
 
     private JmsConnectionPool getConnectionPool() throws ConnectionException {
-	reset();
-	
-        if (jmsConnectionPool == null) {
-            synchronized(this) {
-        	try
-        	{
-        	    checkTransaction();
-        	}
-        	catch (CourierException ex)
-        	{
-        	    throw new ConnectionException(ex);
-        	}
-        	
-                if(jmsConnectionPool == null) {
-                    String sFactoryClass;
-                    String sType;
-                    Properties properties;
-                    String username;
-                    String password;
-                    boolean transacted;
+        synchronized(this) {
+            if(jmsConnectionPool == null) {
+                String sFactoryClass;
+                Properties properties;
+                String username;
+                String password;
 
-                    try {
-                        sFactoryClass = _epr.getConnectionFactory();
-                        sType = _epr.getDestinationType();
-                        properties = _epr.getJndiEnvironment();
-                        username = _epr.getJMSSecurityPrincipal();
-                        password = _epr.getJMSSecurityCredential();
-                        transacted = _epr.getTransacted();
-                    } catch (URISyntaxException e) {
-                        throw new ConnectionException("Unexpected exception while getting JMS connection pool.", e);
-                    }
+                try {
+                    sFactoryClass = _epr.getConnectionFactory();
+                    properties = _epr.getJndiEnvironment();
+                    username = _epr.getJMSSecurityPrincipal();
+                    password = _epr.getJMSSecurityCredential();
+                } catch (URISyntaxException e) {
+                    throw new ConnectionException("Unexpected exception while getting JMS connection pool.", e);
+                }
 
-                    if (Util.isNullString(sFactoryClass)) {
-                        sFactoryClass = "ConnectionFactory";
-                    }
-
-                    /*
-                     * Needs to be a one-shot instance if transactional.
-                     */
-                    
-                    Object tx = null;
-                    
-                    try
-                    {
-                	TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
-                	
-                	tx = txS.getTransaction();
-                    }
-                    catch (TransactionStrategyException ex)
-                    {
-                	_logger.warn("Problem getting transaction strategy: ", ex);
-                	
-                	throw new ConnectionException(ex);
-                    }
-
-                    if (tx == null)
-                	jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, transacted);
-                    else
-                	jmsConnectionPool = XaJmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, true, tx);  // force transacted to be true!
+                if (Util.isNullString(sFactoryClass)) {
+                    sFactoryClass = "ConnectionFactory";
                 }
+
+                jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, username, password);
             }
-        }
 
-        return jmsConnectionPool;
+            return jmsConnectionPool;
+        }
     }
-
-    private void reset ()
-    {
-	/*
-	 * Are we in a global transaction?
-	 */
-	
-	if (jmsConnectionPool instanceof XaJmsConnectionPool)
-	{
-	    /*
-	     * If the global transaction has terminated then this pool instance has been
-	     * closed and we need to get another one.
-	     */
-	    
-	    if (!((XaJmsConnectionPool) jmsConnectionPool).active())
-	    {
-		jmsConnectionPool = null;
-		_messageProducer = null;
-		_messageConsumer = null;
-		jmsSession = null;
-	    }
-	}
-    }
     
-    private void checkTransaction () throws CourierException
-    {
-	try
-	{
-        	TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
-		Object txHandle = txStrategy.getTransaction();
-		boolean isActive = txStrategy.isActive();
-		
-		transactional = (txHandle != null);
-		
-		/*
-		 * Make sure the current transaction is still active! If we
-		 * have previously slept, then the timeout may be longer than that
-		 * associated with the transaction.
-		 */
-		
-		if (transactional && !isActive)
-		{
-			throw new CourierException("Associated transaction is no longer active!");
-		}
-	}
-	catch (TransactionStrategyException ex)
-	{
-	    throw new CourierException(ex);
-	}
-    }
-    
     public Message pickup(long millis) throws CourierException, CourierTimeoutException {
         javax.jms.Message jmsMessage = pickupPayload(millis);
 
@@ -520,9 +406,10 @@
             throw new CourierException("This is an outgoing-only Courier");
         if (millis < 1)
             throw new IllegalArgumentException("Timeout millis must be > 0");
-        if (null == _messageConsumer) try {
-            createMessageConsumer();
-        }
+        synchronized(this) {
+            if (null == _messageConsumer) try {
+                createMessageConsumer();
+            }
         catch (Exception e) {
             try {
                 Thread.sleep(1000); // TODO magic number
@@ -531,21 +418,23 @@
             }
             throw new CourierException("Unable to create Message Consumer", e);
         }
+        }
 
         javax.jms.Message jmsMessage = null;
-        while (null != _messageConsumer) {
-            checkTransaction();
-            
-            try {
-                jmsMessage = _messageConsumer.receive(millis);
-                break;
+        synchronized(this) {
+            while (null != _messageConsumer) {
+                try {
+                    jmsMessage = _messageConsumer.receive(millis);
+                    break;
+                }
+                catch (JMSException e) {
+                    if (!jmsConnectRetry(e))
+                        throw new CourierException("Caught exception during receive and could not reconnect! ",e);
+                }
+                catch (Exception e) {
+                    throw new CourierException(e);
+                }
             }
-            catch (JMSException e) {
-                jmsConnectRetry(e);
-            }
-            catch (Exception e) {
-                throw new CourierException(e);
-            }
         }
         return jmsMessage;
     } // ________________________________
@@ -594,68 +483,64 @@
     private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException {
         Context oJndiCtx = null;
 
-        reset();
-        
-        if (_messageConsumer == null) {
-            synchronized(this) {
-                if (_messageConsumer == null) {
-                    boolean success = false;
-                    try {
-                        Properties environment = _epr.getJndiEnvironment();
-                        oJndiCtx = NamingContextPool.getNamingContext(environment);
-                        try
-                        {
-                            String sType = _epr.getDestinationType();
-                            if (JMSEpr.QUEUE_TYPE.equals(sType)) {
-                                QueueSession qSess = (QueueSession) getJmsSession(_epr.getAcknowledgeMode());
-                                javax.jms.Queue queue = null;
+        synchronized(this) {
+            if (_messageConsumer == null) {
+                boolean success = false;
+                try {
+                    Properties environment = _epr.getJndiEnvironment();
+                    oJndiCtx = NamingContextPool.getNamingContext(environment);
+                    try
+                    {
+                        String sType = _epr.getDestinationType();
+                        if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+                            Session qSess = getJmsSession(_epr.getAcknowledgeMode());
+                            javax.jms.Queue queue = null;
+                            try {
+                                queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
+                                        .getDestinationName());
+                            } catch (NamingException ne) {
                                 try {
+                                    oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
                                     queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
                                             .getDestinationName());
-                                } catch (NamingException ne) {
-                                    try {
-                                        oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
-                                        queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
-                                                .getDestinationName());
-                                    } catch (NamingException nex) {
-                                        //ActiveMQ
-                                        queue = qSess.createQueue(_epr.getDestinationName());
-                                    }
-                                }
-                                _messageConsumer = qSess.createReceiver(queue, _epr.getMessageSelector());
-                            } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
-                                TopicSession tSess = (TopicSession) getJmsSession(_epr.getAcknowledgeMode());
-                                Topic topic = null;
-                                try {
-                                    topic = (Topic) oJndiCtx.lookup(_epr.getDestinationName());
                                 } catch (NamingException nex) {
                                     //ActiveMQ
-                                    topic = tSess.createTopic(_epr.getDestinationName());
+                                    queue = qSess.createQueue(_epr.getDestinationName());
                                 }
-                                
-                                _messageConsumer = tSess.createConsumer(topic, _epr
-                                        .getMessageSelector());
-                            } else {
-                                throw new CourierException("Unknown destination type");
                             }
-                            success = true;
-                        } finally {
-                            NamingContextPool.releaseNamingContext(oJndiCtx) ;
+                            _messageConsumer = qSess.createConsumer(queue, _epr.getMessageSelector());
+                        } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+                            Session tSess = getJmsSession(_epr.getAcknowledgeMode());
+                            Topic topic = null;
+                             try {
+                                   topic = (Topic) oJndiCtx.lookup(_epr
+                                                            .getDestinationName());
+                             }
+                             catch (NamingException ne) {
+                                   topic = tSess.createTopic(_epr.getDestinationName());
+                             }
+                                  _messageConsumer = tSess.createConsumer(topic, _epr
+                                         .getMessageSelector());
+                        } else {
+                            throw new CourierException("Unknown destination type");
                         }
+                        success = true;
+                    } finally {
+                        NamingContextPool.releaseNamingContext(oJndiCtx) ;
                     }
-                    catch (JMSException ex) {
-                        _logger.debug("Error from JMS system.", ex);
+                }
+                catch (JMSException ex) {
+                    _logger.debug("Error from JMS system.", ex);
 
-                        throw new CourierException(ex);
+                    throw new CourierException(ex);
+                }
+                catch (URISyntaxException ex) {
+                    throw new MalformedEPRException(ex);
+                }
+                finally {
+                    if (!success) {
+                        closeSession();
                     }
-                    catch (URISyntaxException ex) {
-                        throw new MalformedEPRException(ex);
-                    }
-                    finally {
-                        if (!success) {
-                            closeSession();
-                        }
-                    }
                 }
             }
         }
@@ -671,17 +556,15 @@
 
     protected String _messageSelector;
 
-    protected volatile Session jmsSession;
+    protected JmsSession jmsSession;
 
-    protected volatile MessageProducer _messageProducer;
+    protected MessageProducer _messageProducer;
 
-    protected volatile MessageConsumer _messageConsumer;
+    protected MessageConsumer _messageConsumer;
 
     protected List<KeyValuePair> _messageProperties;
 
-    protected volatile JmsConnectionPool jmsConnectionPool;
-
-    private boolean transactional = false;
+    protected JmsConnectionPool jmsConnectionPool;
     
     /**
      * Strategy for setting JMSProperties

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -53,13 +53,6 @@
 public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
 {
 	/**
-	 * disable default constructor
-	 */
-	private SqlTableCourier()
-	{
-	}
-
-	/**
 	 * package protected constructor - Objects of Courier should only be
 	 * instantiated by the Factory
 	 * 
@@ -149,7 +142,7 @@
 
 		try
 		{
-        		TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+        		TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
         		Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
         		boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
         		
@@ -237,7 +230,7 @@
 		{
 			try
 			{
-				TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+				TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
 				Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
 				boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
 				

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -28,15 +28,12 @@
 import java.util.Properties;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueSession;
 import javax.jms.Session;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -74,22 +71,40 @@
     private int SLEEP_TIME = DEFAULT_SLEEP;
     
     /** Number of free sessions in the pool that can be given out. Indexed by session key */
-    private Map<Integer,ArrayList<Session>> freeSessionsMap = new HashMap<Integer,ArrayList<Session>>();
+    private Map<Integer,ArrayList<JmsSession>> freeSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
     
     /** Number of session that are currently in use. Indexed by session key mode */
-    private Map<Integer,ArrayList<Session>> inUseSessionsMap = new HashMap<Integer,ArrayList<Session>>();
+    private Map<Integer,ArrayList<JmsSession>> inUseSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
     
     /** Reference to a Queue or Topic Connection, we only need one per pool */
-    protected Connection jmsConnection = null;
+    protected Connection jmsConnection ;
     
     
     /** The Indentifier of the pool */
     private Map<String, String> poolKey;
-    
+
+    /**
+     * Mapping from transactions to sessions.
+     */
+    private Map<Object, JmsXASession> transactionsToSessions = new HashMap<Object, JmsXASession>() ;
+    /**
+     * Mapping from sessions to transactions.
+     */
+    private Map<JmsXASession, Object> sessionsToTransactions = new HashMap<JmsXASession, Object>() ;
+
     /** Logger */
     private Logger logger = Logger.getLogger(this.getClass());
     
     /**
+     * The flag representing XA aware connections.
+     */
+    private boolean isXAAware ;
+    /**
+     * Flag signifying that the pool has been terminated.
+     */
+    private boolean terminated ;
+    
+    /**
      * Contructor of the pool.
      * 
      */
@@ -105,81 +120,35 @@
         MAX_SESSIONS = poolSize;
         SLEEP_TIME = sleepTime;
         
-        freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<Session>() );
-        freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<Session>() );
-        freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<Session>() );
+        freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+        freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+        freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
         
-        inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<Session>() );
-        inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<Session>() );
-        inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<Session>() );
+        inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+        inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+        inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
     }
    
     /**
      * This is where we create the sessions. 
      * 
      * @param poolKey
-     * @throws NamingException
+     * @param transacted
      * @throws JMSException
-     * @throws ConnectionException
      */
-    private  synchronized void addAnotherSession(Map<String, String> poolKey, final int acknowledgeMode)
-    throws NamingException, JMSException, ConnectionException, NamingContextException
+    private  synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
+        throws JMSException
     {
-        String destinationType = poolKey.get(JMSEpr.DESTINATION_TYPE_TAG);
-
-        //Setup a connection if we don't have one
-        if (jmsConnection==null) {
-            JmsConnectionPoolContainer.addToPool(poolKey, this);
-            logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
-            Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
-            Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
-            try {
-                String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
-                Object factoryConnection=null;
-
-                try
-                {
-                    factoryConnection = jndiContext.lookup(overrideName(connectionFactoryString));
-                } catch (NamingException ne) {
-                    logger.info("Received NamingException, refreshing context.");
-                    jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
-                    factoryConnection = jndiContext.lookup(connectionFactoryString);
-                }
-                final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
-                final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
-                boolean useJMSSecurity = (username != null && password != null);
-                logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
-                
-                if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
-                    QueueConnectionFactory factory = (QueueConnectionFactory) factoryConnection;
-                    jmsConnection = useJMSSecurity ? factory.createQueueConnection(username,password): factory.createQueueConnection();
-                } else {
-                    TopicConnectionFactory factory = (TopicConnectionFactory) factoryConnection;
-                    jmsConnection = useJMSSecurity ? factory.createTopicConnection(username,password): factory.createTopicConnection();
-                }
-                
-                addExceptionListener();
-                
-                jmsConnection.start();
-            } finally {
-                NamingContextPool.releaseNamingContext(jndiContext) ;
-            }
-        }
-        final boolean transacted = Boolean.valueOf(poolKey.get(JMSEpr.TRANSACTED_TAG));
-        
         //Create a new Session
-        ArrayList<Session> freeSessions = freeSessionsMap.get( acknowledgeMode );
-
-        if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
-            QueueSession session = ((QueueConnection)jmsConnection).createQueueSession(transacted,acknowledgeMode);
-                    
-            freeSessions.add(session);
-        } else if (JMSEpr.TOPIC_TYPE.equals(destinationType)) {
-            TopicSession session = ((TopicConnection) jmsConnection).createTopicSession(transacted,acknowledgeMode);
-            freeSessions.add(session);
+        ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
+        // For now we only support JTA transacted sessions
+        final JmsSession session ;
+        if (transacted) {
+            session = new JmsXASession(this, ((XAConnection)jmsConnection).createXASession());
         } else {
-            throw new ConnectionException("Unknown destination type");
+            session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
         }
+        freeSessions.add(session);
         logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
     }
 
@@ -189,14 +158,35 @@
      * @return Connection to be used
      * @throws ConnectionException
      */
-    public synchronized Session getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
+    public synchronized JmsSession getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
     {
+        try {
+            initConnection() ;
+        } catch (final NamingContextException nce) {
+            throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
+        }
+        final boolean transacted ;
+        try {
+            transacted = (isXAAware && TransactionStrategy.getTransactionStrategy(true).isActive()) ;
+        } catch (final TransactionStrategyException tse) {
+            throw new ConnectionException("Failed to determine current transaction context", tse) ;
+        }
+        
+        if (transacted)
+        {
+            final JmsXASession currentSession = getXASession() ;
+            if (currentSession != null)
+            {
+                return currentSession ;
+            }
+        }
+        final int mode = (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode) ;
+        
         final long end = System.currentTimeMillis() + (SLEEP_TIME * 1000) ;
         boolean emitExpiry = logger.isDebugEnabled() ;
         for(;;) {
-        	
-        	ArrayList<Session> freeSessions = freeSessionsMap.get(acknowledgeMode );
-        	ArrayList<Session> inUseSessions = inUseSessionsMap.get(acknowledgeMode);
+            ArrayList<JmsSession> freeSessions = freeSessionsMap.get(mode );
+            ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
             if (freeSessions.size() > 0)
             {
                 if (logger.isDebugEnabled()) {
@@ -204,16 +194,11 @@
                             + ", maxsize=" + MAX_SESSIONS 
                             + ", number of pools=" + JmsConnectionPoolContainer.getNumberOfPools());
                 }
-                final Session session = freeSessions.remove(freeSessions.size()-1);
+                final JmsSession session = freeSessions.remove(freeSessions.size()-1);
                 inUseSessions.add(session);
                 return session ;
             } else if (inUseSessions.size()<MAX_SESSIONS) {
-                try {
-                    addAnotherSession(poolKey,acknowledgeMode);
-                } catch (final NamingContextException nce) {
-                    throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
-                }
-                
+                addAnotherSession(poolKey, transacted, mode);
                 continue ;
             } else {
                 if (emitExpiry)
@@ -239,74 +224,106 @@
         }
     }
     /**
-     *  This method can be called whenever a Queue Session is needed from the pool.
+     *  This method can be called whenever a Session is needed from the pool.
      * @return
      * @throws NamingException
      * @throws JMSException
      * @throws ConnectionException
      */
-    public QueueSession getQueueSession() throws NamingException, JMSException, ConnectionException
+    public JmsSession getSession() throws NamingException, JMSException, ConnectionException
     {
-        return (QueueSession) getQueueSession(Session.AUTO_ACKNOWLEDGE);
+        return getSession(Session.AUTO_ACKNOWLEDGE);
     }
     
-    public QueueSession getQueueSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
-    {
-        return (QueueSession) getSession(acknowledgeMode);
+    /**
+     * This method closes an open connection and returns the connection to the pool.
+     * @param session The connection to be returned to the pool.
+     * @throws SQLException
+     * @deprecated
+     */
+    public void closeSession(Session session){
+        if (session instanceof JmsSession) {
+            closeSession((JmsSession)session) ;
+        } else {
+            logger.error("Invalid JMS Session type in closeSession: " + session);
+        }
     }
     
     /**
-     * This method can be called whenever a Topic Session is needed from the pool.
-     * @return
-     * @throws NamingException
-     * @throws JMSException
-     * @throws ConnectionException
+     * This method closes an open connection and returns the connection to the pool.
+     * @param session The connection to be returned to the pool.
+     * @throws SQLException
      */
-    public TopicSession getTopicSession() throws NamingException, JMSException, ConnectionException
+    public void closeSession(JmsSession session){
+        session.handleCloseSession(this) ;
+    }
+    
+    /**
+     * Handle the real work of closing the connection.
+     * @param session The session to close.
+     */
+    synchronized void handleCloseSession(final JmsSession session)
     {
-        return (TopicSession) getTopicSession(Session.AUTO_ACKNOWLEDGE);
+        final int mode ;
+        try {
+            mode = session.getAcknowledgeMode() ;
+        } catch (final JMSException jmse) {
+            logger.warn("JMSException while calling getAcknowledgeMode") ;
+            logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+            return ;
+        }
+        
+        final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+        if (sessions != null) {
+            sessions.add(session) ;
+        }
+        handleReleaseSession(session) ;
     }
     
-    public TopicSession getTopicSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
+    /**
+     * Handle the real work of releasing the connection.
+     * @param session The session to release.
+     */
+    synchronized void handleReleaseSession(final JmsSession session)
     {
-        return (TopicSession) getSession(acknowledgeMode);
+        session.releaseResources() ;
+        final int mode ;
+        try {
+            mode = session.getAcknowledgeMode() ;
+        } catch (final JMSException jmse) {
+            logger.warn("JMSException while calling getAcknowledgeMode") ;
+            logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+            return ;
+        }
+        
+        final ArrayList<JmsSession> sessions = (inUseSessionsMap == null ? null : inUseSessionsMap.get(mode));
+        if (sessions != null) {
+            sessions.remove(session) ;
+        }
+        notifyAll() ;
     }
-
+    
     /**
-     * This method closes an open connection and returns the connection to the pool.
-     * @param sessionToClose The connection to be returned to the pool.
+     * This method closes an open session without returning it to the pool.
+     * @param session The session to be returned to the pool.
      * @throws SQLException
      */
-    public  synchronized void closeSession(Session sessionToClose){
-		try
-		{
-        	ArrayList<Session> sessions = freeSessionsMap.get(sessionToClose.getAcknowledgeMode());
-        	if ( sessions != null )
-	        	sessions.add(sessionToClose);
-        	
-            releaseSession(sessionToClose) ;
-		} catch (JMSException e)
-		{
-			logger.error("JMSException while calling getAcknowledgeMode", e);
-		}
+    public synchronized void releaseSession(final JmsSession session) {
+        session.handleReleaseSession(this) ;
     }
     
     /**
      * This method closes an open session without returning it to the pool.
-     * @param sessionToClose The session to be returned to the pool.
+     * @param session The session to be returned to the pool.
      * @throws SQLException
+     * @deprecated
      */
-    public synchronized void releaseSession(final Session sessionToClose) {
-    	try
-		{
-			ArrayList<Session> inUseSessions = inUseSessionsMap.get(sessionToClose.getAcknowledgeMode());
-			if ( inUseSessions != null )
-	            inUseSessions.remove(sessionToClose);
-            notifyAll() ;
-		} catch (JMSException e)
-		{
-			logger.error("JMSException while calling getAcknowledgeMode", e);
-		}
+    public synchronized void releaseSession(final Session session) {
+      if (session instanceof JmsSession) {
+          releaseSession((JmsSession)session) ;
+      } else {
+          logger.error("Invalid JMS Session type in releaseSession: " + session);
+      }
     }
 
     /**
@@ -315,20 +332,18 @@
      */
     public synchronized void removeSessionPool()
     {
-        freeSessionsMap.get(Session.AUTO_ACKNOWLEDGE).clear() ;
-        freeSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).clear() ;
-        freeSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).clear() ;
+        freeSessionsMap = null ;
+        inUseSessionsMap = null ;
+        transactionsToSessions = null ;
+        sessionsToTransactions = null ;
         
-        inUseSessionsMap.get(Session.AUTO_ACKNOWLEDGE).clear() ;
-        inUseSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).clear() ;
-        inUseSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).clear() ;
-        
         logger.debug("Emptied the session pool now closing the connection to the factory.");
         if (jmsConnection!=null) {
             try {
                 jmsConnection.close();
             } catch (final Exception ex) {} // ignore
             jmsConnection=null;
+            terminated = true ;
         }
         JmsConnectionPoolContainer.removePool(poolKey);
     }
@@ -338,13 +353,10 @@
      * @return the session pool size
      */
     public int getSessionsInPool() {
-    	int nrOfSessions = freeSessionsMap.get(Session.AUTO_ACKNOWLEDGE).size();
-    	nrOfSessions += freeSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).size();
-    	nrOfSessions += freeSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).size();
-    	nrOfSessions += inUseSessionsMap.get(Session.AUTO_ACKNOWLEDGE).size();
-    	nrOfSessions += inUseSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).size();
-    	nrOfSessions += inUseSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).size();
-    	return nrOfSessions;
+        return getSessionsInPool(Session.AUTO_ACKNOWLEDGE) +
+            getSessionsInPool(Session.CLIENT_ACKNOWLEDGE) +
+            getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE) +
+            getSessionsInPool(Session.SESSION_TRANSACTED) ;
     }
     
     /**
@@ -353,8 +365,8 @@
      * @param acknowledgeMode the acknowledge mode of sessions
      * @return
      */
-    public int getSessionsInPool(final int acknowledgeMode) {
-        return freeSessionsMap.get(acknowledgeMode).size() + inUseSessionsMap.get(acknowledgeMode).size();
+    public synchronized int getSessionsInPool(final int acknowledgeMode) {
+        return getFreeSessionsInPool(acknowledgeMode) + getInUseSessionsInPool(acknowledgeMode) ;
     }
     
     /**
@@ -362,8 +374,10 @@
      * @param acknowledgeMode the acknowledge mode of sessions
      * @return int	the number of in use sessions
      */
-    public int getFreeSessionsInPool(final int acknowledgeMode) {
-        return freeSessionsMap.get(acknowledgeMode).size();
+    public synchronized int getFreeSessionsInPool(final int acknowledgeMode) {
+        final ArrayList<JmsSession> freeSessionMap = (freeSessionsMap == null ? null : freeSessionsMap.get(acknowledgeMode)) ;
+        final int numFreeSessions = (freeSessionMap == null ? 0 : freeSessionMap.size()) ;
+        return numFreeSessions;
     }
     
     /**
@@ -372,25 +386,126 @@
      * @param acknowledgeMode the acknowledge mode of sessions
      * @return int	the number of in use sessions
      */
-    public int getInUseSessionsInPool(final int acknowledgeMode) {
-        return inUseSessionsMap.get(acknowledgeMode).size();
+    public synchronized int getInUseSessionsInPool(final int acknowledgeMode) {
+        final ArrayList<JmsSession> inUseSessionMap = (inUseSessionsMap == null ? null : inUseSessionsMap.get(acknowledgeMode)) ;
+        final int numInUseSessions = (inUseSessionMap == null ? 0 : inUseSessionMap.size()) ;
+        return numInUseSessions;
     }
     
-    protected String overrideName (String name) throws ConnectionException
+    /**
+     * Initialise the connection.
+     * @throws ConnectionException If the pool has already been terminated.
+     * @throws NamingContextException for errors obtaining a naming context
+     * @throws NamingException for errors accessing a naming context
+     * @throws JMSException for errors creating the connection
+     */
+    private synchronized void initConnection()
+        throws ConnectionException, NamingContextException, NamingException, JMSException
     {
-	return name;
+        if (terminated)
+        {
+            throw new ConnectionException("Connection pool has been terminated") ;
+        }
+        
+        if (jmsConnection==null) {
+            JmsConnectionPoolContainer.addToPool(poolKey, this);
+            logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
+            Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
+            Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
+            try {
+                String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
+                Object factoryConnection=null;
+
+                try
+                {
+                    factoryConnection = jndiContext.lookup(connectionFactoryString);
+                } catch (NamingException ne) {
+                    logger.info("Received NamingException, refreshing context.");
+                    jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
+                    factoryConnection = jndiContext.lookup(connectionFactoryString);
+                }
+                final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
+                final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
+                boolean useJMSSecurity = (username != null && password != null);
+                logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
+                if (factoryConnection instanceof XAConnectionFactory) {
+                    final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
+                    jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
+                    isXAAware = true ;
+                    freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+                    inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+                } else if (factoryConnection instanceof ConnectionFactory) {
+                    final ConnectionFactory factory = (ConnectionFactory)factoryConnection ;
+                    jmsConnection = useJMSSecurity ? factory.createConnection(username,password): factory.createConnection();
+                }
+                
+                jmsConnection.setExceptionListener(new ExceptionListener() {
+                    public void onException(JMSException arg0)
+                    {
+                        removeSessionPool() ;
+                    }
+                }) ;
+                jmsConnection.start();
+            } finally {
+                NamingContextPool.releaseNamingContext(jndiContext) ;
+            }
+        }
     }
     
-    protected void addExceptionListener () throws JMSException, ConnectionException
+    /**
+     * Get the current transaction.
+     * @return The transaction or null if none present.
+     * @throws ConnectionException if the transaction context cannot be obtained.
+     */
+    private Object getTransaction()
+        throws ConnectionException
     {
-        jmsConnection.setExceptionListener(new ExceptionListener() {
-            public void onException(JMSException arg0)
-            {
-                removeSessionPool() ;
-            }
-        }) ;
+        try {
+            return TransactionStrategy.getTransactionStrategy(true).getTransaction() ;
+        } catch (final TransactionStrategyException tse) {
+            throw new ConnectionException("Failed to determine current transaction context", tse) ;
+        }
     }
     
+    /**
+     * Get a JMS session associated with the current transaction.
+     * @return The JMS session or null if not associated. 
+     * @throws ConnectionException For accessint the current transaction context
+     */
+    private synchronized JmsXASession getXASession()
+        throws ConnectionException
+    {
+        final Object tx = getTransaction() ;
+        return transactionsToSessions.get(tx) ;
+    }
+    
+    /**
+     * Associate the JMS XA Session with the current transaction.
+     * @param session The XA session.
+     * @throws ConnectionException if there is no transaction active.
+     */
+    synchronized void associateTransaction(final JmsXASession session)
+        throws ConnectionException
+    {
+        final Object tx = getTransaction() ;
+        if (tx == null)
+        {
+            throw new ConnectionException("No active transaction") ;
+        }
+        transactionsToSessions.put(tx, session) ;
+        sessionsToTransactions.put(session, tx) ;
+    }
+    
+    /**
+     * Disassociate the JMS XA Session from a transaction.
+     * @param session The XA session.
+     */
+    synchronized void disassociateTransaction(final JmsXASession session)
+    {
+        final Object tx = sessionsToTransactions.remove(session) ;
+        transactionsToSessions.remove(tx) ;
+    }
+    
     static
     {
     	PropertyManager prop = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE);

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -70,50 +70,38 @@
     {
         return getPool(jmsEpr.getJndiEnvironment(), 
         		jmsEpr.getConnectionFactory(), 
-        		jmsEpr.getDestinationType(), 
                 jmsEpr.getJMSSecurityPrincipal(), 
-                jmsEpr.getJMSSecurityCredential(),
-                jmsEpr.getTransacted());
+                jmsEpr.getJMSSecurityCredential()) ;
     }
     /**
      * Returns the pool given the identifiers for the JMS provider.
      * 
      * @param enviroment 			- JNDI environment for which a JMSConnectionPool should be retreived
      * @param connectionFactory		- connectionfactory for which a JMSConnectionPool should be retreived
-     * @param destinationType		- destinationType(Queue or Topic) for which a JMSConnectionPool should be retreived
      * @return <code>JmsConnectionPool</code>
      * @throws ConnectionException
      */
-    public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType)
+    public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory)
         throws ConnectionException
     {
-    	return getPool(enviroment, connectionFactory, destinationType, null, null, false);
+    	return getPool(enviroment, connectionFactory, null, null);
     }
-    public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
-    		final String username, final String password)
-        throws ConnectionException
-    {
-    	return getPool( enviroment, connectionFactory, destinationType, username, password, false );
-    	
-    }
     
     /**
      * Returns the pool given the identifiers for the JMS provider.
      * 
      * @param enviroment 			- JNDI evironment for which a JMSConnectionPool should be retreived
      * @param connectionFactory		- connectionfactory for which a JMSConnectionPool should be retreived
-     * @param destinationType		- destinationType(Queue or Topic) for which a JMSConnectionPool should be retreived
      * @param username				- username that should be used to create the JMS Connection
      * @param password				- password that should be used to create the JMS Connection
-     * @param transacted			- should the JMS Session be transacted
      * @return <code>JmsConnectionPool</code>
      * @throws ConnectionException
      */
-    public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
-    		final String username, final String password, final boolean transacted)
+    public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory,
+    		final String username, final String password)
         throws ConnectionException
     {
-        Map<String,String> poolKey = createPoolKey(enviroment, connectionFactory, destinationType, username, password, transacted);
+        Map<String,String> poolKey = createPoolKey(enviroment, connectionFactory, username, password);
         final Map<Map<String, String>, JmsConnectionPool> poolMap = getMap() ;
   
         if (poolMap.containsKey(poolKey)) {
@@ -131,25 +119,23 @@
      * 
      * @param environment - the JNDI environment parameters
      * @param connectionFactory
-     * @param destinationType
      * @return
      */
-    public static Map<String, String> createPoolKey(Properties  environment, String connectionFactory, String destinationType) 
+    public static Map<String, String> createPoolKey(Properties  environment, String connectionFactory) 
     {
-    	return createPoolKey( environment, connectionFactory, destinationType, null, null, false );
+    	return createPoolKey( environment, connectionFactory, null, null );
     }
     /**
      * Creates a poolKey using the identifying parameters
      * 
      * @param environment - the JNDI environment parameters
      * @param connectionFactory
-     * @param destinationType
      * @param username		the JMS username to be used. Used with  {@link javax.jms.ConnectionFactory } createConnection
      * @param password		the JMS password to be used. Used with  {@link javax.jms.ConnectionFactory } createConnection
      * @return
      */
-    public static Map<String, String> createPoolKey(Properties  environment, String connectionFactory, String destinationType,
-    		final String username, final String password, final boolean transacted) 
+    public static Map<String, String> createPoolKey(Properties  environment, String connectionFactory,
+    		final String username, final String password) 
     {
         Map<String,String> poolKey = new HashMap<String,String>();
         if (environment!=null) {
@@ -164,10 +150,7 @@
         }
         
         if (connectionFactory!=null)  poolKey.put(JMSEpr.CONNECTION_FACTORY_TAG, connectionFactory);
-        if (destinationType!=null)    poolKey.put(JMSEpr.DESTINATION_TYPE_TAG, destinationType);
         
-        poolKey.put(JMSEpr.TRANSACTED_TAG, String.valueOf(transacted));
-        
         return poolKey;
     }
     /**

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -0,0 +1,380 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.io.Serializable;
+import java.util.HashSet;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Wrapper for JMS session class, responsible for tracking resources and the pooling.
+ */
+public class JmsSession implements Session
+{
+    /**
+     * The session delegate.
+     */
+    private final Session session ;
+    
+    /**
+     * The set of active queue browsers.
+     */
+    private HashSet<QueueBrowser> queueBrowserSet ;
+    /**
+     * The set of active message consumers.
+     */
+    private HashSet<MessageConsumer> messageConsumerSet ;
+    /**
+     * The set of active message producers.
+     */
+    private HashSet<MessageProducer> messageProducerSet ;
+    
+    /**
+     * Create the session wrapper.
+     * @param session The session delegate.
+     * @param isJTA True if this tales part in a JTA transaction
+     * @throws JMSException
+     */
+    JmsSession(final Session session)
+        throws JMSException
+    {
+        this.session = session ;
+    }
+
+    public void close() throws JMSException
+    {
+        session.close();
+    }
+
+    public void commit() throws JMSException
+    {
+        session.commit();
+    }
+
+    public QueueBrowser createBrowser(Queue arg0, String arg1)
+            throws JMSException
+    {
+        return trackQueueBrowser(session.createBrowser(arg0, arg1));
+    }
+
+    public QueueBrowser createBrowser(Queue arg0) throws JMSException
+    {
+        return trackQueueBrowser(session.createBrowser(arg0));
+    }
+
+    public BytesMessage createBytesMessage() throws JMSException
+    {
+        associate() ;
+        return session.createBytesMessage();
+    }
+
+    public MessageConsumer createConsumer(Destination arg0, String arg1,
+            boolean arg2) throws JMSException
+    {
+        return trackMessageConsumer(session.createConsumer(arg0, arg1, arg2));
+    }
+
+    public MessageConsumer createConsumer(Destination arg0, String arg1)
+            throws JMSException
+    {
+        return trackMessageConsumer(session.createConsumer(arg0, arg1));
+    }
+
+    public MessageConsumer createConsumer(Destination arg0) throws JMSException
+    {
+        return trackMessageConsumer(session.createConsumer(arg0));
+    }
+
+    public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1,
+            String arg2, boolean arg3) throws JMSException
+    {
+        return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1, arg2, arg3));
+    }
+
+    public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1)
+            throws JMSException
+    {
+        return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1));
+    }
+
+    public MapMessage createMapMessage() throws JMSException
+    {
+        associate() ;
+        return session.createMapMessage();
+    }
+
+    public Message createMessage() throws JMSException
+    {
+        associate() ;
+        return session.createMessage();
+    }
+
+    public ObjectMessage createObjectMessage() throws JMSException
+    {
+        associate() ;
+        return session.createObjectMessage();
+    }
+
+    public ObjectMessage createObjectMessage(Serializable arg0)
+            throws JMSException
+    {
+        associate() ;
+        return session.createObjectMessage(arg0);
+    }
+
+    public MessageProducer createProducer(Destination arg0) throws JMSException
+    {
+        return trackMessageProducer(session.createProducer(arg0));
+    }
+
+    public Queue createQueue(String arg0) throws JMSException
+    {
+        associate() ;
+        return session.createQueue(arg0);
+    }
+
+    public StreamMessage createStreamMessage() throws JMSException
+    {
+        associate() ;
+        return session.createStreamMessage();
+    }
+
+    public TemporaryQueue createTemporaryQueue() throws JMSException
+    {
+        associate() ;
+        return session.createTemporaryQueue();
+    }
+
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        associate() ;
+        return session.createTemporaryTopic();
+    }
+
+    public TextMessage createTextMessage() throws JMSException
+    {
+        associate() ;
+        return session.createTextMessage();
+    }
+
+    public TextMessage createTextMessage(String arg0) throws JMSException
+    {
+        associate() ;
+        return session.createTextMessage(arg0);
+    }
+
+    public Topic createTopic(String arg0) throws JMSException
+    {
+        associate() ;
+        return session.createTopic(arg0);
+    }
+
+    public int getAcknowledgeMode() throws JMSException
+    {
+        associate() ;
+        return session.getAcknowledgeMode();
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        associate() ;
+        return session.getMessageListener();
+    }
+
+    public boolean getTransacted() throws JMSException
+    {
+        associate() ;
+        return session.getTransacted();
+    }
+
+    public void recover() throws JMSException
+    {
+        associate() ;
+        session.recover();
+    }
+
+    public void rollback() throws JMSException
+    {
+        session.rollback();
+    }
+
+    public void run()
+    {
+        session.run();
+    }
+
+    public void setMessageListener(MessageListener arg0) throws JMSException
+    {
+        associate() ;
+        session.setMessageListener(arg0);
+    }
+
+    public void unsubscribe(String arg0) throws JMSException
+    {
+        associate() ;
+        session.unsubscribe(arg0);
+    }
+
+    private synchronized QueueBrowser trackQueueBrowser(QueueBrowser queueBrowser)
+        throws JMSException
+    {
+        associate() ;
+        if (queueBrowserSet == null)
+        {
+            queueBrowserSet = new HashSet<QueueBrowser>() ;
+        }
+        final QueueBrowser result = getQueueBrowser(queueBrowser) ;
+        queueBrowserSet.add(result) ;
+        return result ;
+    }
+
+    private synchronized MessageConsumer trackMessageConsumer(MessageConsumer messageConsumer)
+        throws JMSException
+    {
+        associate() ;
+        if (messageConsumerSet == null)
+        {
+            messageConsumerSet = new HashSet<MessageConsumer>() ;
+        }
+        final MessageConsumer result = getMessageConsumer(messageConsumer) ;
+        messageConsumerSet.add(result) ;
+        return result ;
+    }
+
+    private synchronized TopicSubscriber trackTopicSubscriber(TopicSubscriber topicSubscriber)
+        throws JMSException
+    {
+        associate() ;
+        if (messageConsumerSet == null)
+        {
+            messageConsumerSet = new HashSet<MessageConsumer>() ;
+        }
+        final TopicSubscriber result = getTopicSubscriber(topicSubscriber) ;
+        messageConsumerSet.add(result) ;
+        return result ;
+    }
+
+    private synchronized MessageProducer trackMessageProducer(MessageProducer messageProducer)
+        throws JMSException
+    {
+        associate() ;
+        if (messageProducerSet == null)
+        {
+            messageProducerSet = new HashSet<MessageProducer>() ;
+        }
+        final MessageProducer result = getMessageProducer(messageProducer) ;
+        messageProducerSet.add(result) ;
+        return result ;
+    }
+
+    synchronized void releaseResources()
+    {
+        if (queueBrowserSet != null)
+        {
+            for(QueueBrowser queueBrowser: queueBrowserSet)
+            {
+                try {
+                    queueBrowser.close() ;
+                } catch (final JMSException jmse) {} // ignore
+            }
+            queueBrowserSet = null ;
+        }
+        if (messageConsumerSet != null)
+        {
+            for(MessageConsumer messageConsumer: messageConsumerSet)
+            {
+                try {
+                    messageConsumer.close() ;
+                } catch (final JMSException jmse) {} // ignore
+            }
+            messageConsumerSet = null ;
+        }
+        if (messageProducerSet != null)
+        {
+            for(MessageProducer messageProducer: messageProducerSet)
+            {
+                try {
+                    messageProducer.close() ;
+                } catch (final JMSException jmse) {} // ignore
+            }
+            messageProducerSet = null ;
+        }
+        try
+        {
+            recover() ;
+        }
+        catch (final JMSException jmse) {} // ignore
+    }
+
+    protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
+    {
+        return queueBrowser ;
+    }
+
+    protected MessageConsumer getMessageConsumer(MessageConsumer messageConsumer)
+    {
+        return messageConsumer ;
+    }
+
+    protected TopicSubscriber getTopicSubscriber(TopicSubscriber topicSubscriber)
+    {
+        return topicSubscriber ;
+    }
+
+    protected MessageProducer getMessageProducer(MessageProducer messageProducer)
+    {
+        return messageProducer ;
+    }
+
+    protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+    {
+        jmsConnectionPool.handleCloseSession(this) ;
+    }
+
+    protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+    {
+        jmsConnectionPool.handleReleaseSession(this) ;
+    }
+    
+    protected void associate()
+        throws JMSException
+    {
+    }
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -0,0 +1,230 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.TopicSubscriber;
+import javax.jms.XASession;
+import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+
+/**
+ * Wrapper for JMS XA session class, responsible for tracking resources and the pooling.
+ */
+class JmsXASession extends JmsSession implements Synchronization
+{
+    /**
+     * The connection pool.
+     */
+    private final JmsConnectionPool pool ;
+    
+    /**
+     * The session delegate.
+     */
+    private final XASession session ;
+    
+    /**
+     * Flag indicating whether this session is associated with a transaction.
+     */
+    private boolean associated ;
+    
+    /**
+     * Cleanup actions
+     */
+    private enum Cleanup { close, release }
+    
+    /**
+     * The cleanup action for the synchronization.
+     */
+    private Cleanup cleanupAction = Cleanup.close ;
+    
+    /**
+     * Create the session wrapper.
+     * @param pool The current connection pool
+     * @param session The session delegate.
+     * @param isJTA True if this tales part in a JTA transaction
+     * @throws JMSException
+     */
+    JmsXASession(final JmsConnectionPool pool, final XASession session)
+        throws JMSException
+    {
+        super(session) ;
+        this.pool = pool ;
+        this.session = session ;
+    }
+    
+    @Override
+    public void commit() throws JMSException
+    {
+        // Handled by the transaction
+    }
+    
+    @Override
+    public void rollback() throws JMSException
+    {
+        try
+        {
+            TransactionStrategy.getTransactionStrategy(true).rollbackOnly() ;
+        }
+        catch (final TransactionStrategyException tse)
+        {
+            final JMSException ex = new JMSException("Failed to rollback transaction") ;
+            ex.initCause(tse) ;
+            throw ex ;
+        }
+    }
+    
+    @Override
+    protected MessageProducer getMessageProducer(MessageProducer messageProducer)
+    {
+        final InvocationHandler handler = new AssociationHandler(messageProducer) ;
+        return (MessageProducer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageProducer.class}, handler );
+    }
+    
+    @Override
+    protected MessageConsumer getMessageConsumer(MessageConsumer messageConsumer)
+    {
+        final InvocationHandler handler = new AssociationHandler(messageConsumer) ;
+        return (MessageConsumer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageConsumer.class}, handler);
+    }
+    
+    @Override
+    protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
+    {
+        final InvocationHandler handler = new AssociationHandler(queueBrowser) ;
+        return (QueueBrowser)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueBrowser.class}, handler);
+    }
+    
+    @Override
+    protected TopicSubscriber getTopicSubscriber(TopicSubscriber topicSubscriber)
+    {
+        final InvocationHandler handler = new AssociationHandler(topicSubscriber) ;
+        return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
+    }
+    
+    protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+    {
+        cleanupAction = Cleanup.close ;
+    }
+    
+    protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+    {
+        cleanupAction = Cleanup.release ;
+    }
+    
+    protected synchronized void associate()
+        throws JMSException
+    {
+        if (!associated)
+        {
+            final XAResource resource = session.getXAResource() ;
+            final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+            try
+            {
+                transactionStrategy.registerSynchronization(this) ;
+                transactionStrategy.enlistResource(resource) ;
+            }
+            catch (final TransactionStrategyException tse)
+            {
+                final JMSException ex = new JMSException("Failed to initialise transaction resources") ;
+                ex.initCause(tse) ;
+                throw ex ;
+            }
+            try
+            {
+                pool.associateTransaction(this) ;
+            }
+            catch (final ConnectionException ce)
+            {
+                final JMSException ex = new JMSException("Failed to associate session with the current transaction") ;
+                ex.initCause(ce) ;
+                throw ex ;
+            }
+            
+            associated = true ;
+        }
+    }
+    
+    public void beforeCompletion()
+    {
+    }
+    
+    public synchronized void afterCompletion(final int result)
+    {
+        pool.disassociateTransaction(this) ;
+        switch (cleanupAction)
+        {
+        case close:
+            pool.handleCloseSession(this) ;
+            break ;
+        case release:
+            pool.handleReleaseSession(this) ;
+            break ;
+        }
+        associated = false ;
+    }
+    
+    /**
+     * Handler responsible for associating XA resources.
+     * @author kevin
+     */
+    private final class AssociationHandler implements InvocationHandler
+    {
+        /**
+         * The target instance.
+         */
+        private final Object target ;
+        
+        /**
+         * Construct the handler using the specified target.
+         * @param target The target instance.
+         */
+        public AssociationHandler(final Object target)
+        {
+            this.target = target ;
+        }
+        
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+        {
+            associate() ;
+            try
+            {
+                return method.invoke(target, args);
+            }
+            catch (final InvocationTargetException ite)
+            {
+                throw ite.getCause() ;
+            }
+        }
+    }
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -34,7 +34,6 @@
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Queue;
-import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.naming.Context;
@@ -44,6 +43,7 @@
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.actions.ActionProcessingException;
 import org.jboss.soa.esb.addressing.EPR;
@@ -344,7 +344,7 @@
 	}
 
     private static class JMSSendQueueSetup {
-        Session jmsSession;
+        JmsSession jmsSession;
         Destination jmsDestination;
         MessageProducer jmsProducer;
         String destinationName;
@@ -377,11 +377,11 @@
             environment.setProperty(Context.URL_PKG_PREFIXES, jndiPkgPrefix);
             Context oCtx = NamingContextPool.getNamingContext(environment);
             try {
-                pool = JmsConnectionPoolContainer.getPool(environment, connectionFactoryName, JMSEpr.QUEUE_TYPE);
+                pool = JmsConnectionPoolContainer.getPool(environment, connectionFactoryName);
                 
                 this.destinationName = destinationName;
                 
-                jmsSession = pool.getQueueSession();
+                jmsSession = pool.getSession();
                 boolean clean = true ;
                 try {
                     try {

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -529,7 +529,6 @@
                 } catch (Throwable t) {
                     logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + targetEPR + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", t);
                 } finally {
-                    // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
                     CourierUtil.cleanCourier(courier);
 
                     // put back the old To since we will have changed it.

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -30,6 +30,7 @@
 import javax.transaction.Synchronization;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
 
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.util.XMLHelper;
@@ -278,7 +279,25 @@
         	throw new TransactionStrategyException("Problem when registering synchronization: ", th);
             }
         }
+        
         /**
+         * Add a resource to the current transaction.
+         * @param resource
+         * @throws TransactionStrategyException
+         */
+        public void enlistResource(XAResource resource) throws TransactionStrategyException
+        {
+            try
+            {
+                tm.getTransaction().enlistResource(resource);
+            }
+            catch (final Throwable th)
+            {
+                throw new TransactionStrategyException("Problem when enlisting resource", th);
+            }
+        }
+        
+        /**
          * Is the currently associated transaction active?
          * @return
          * @throws TransactionStrategyException

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -23,6 +23,7 @@
 package org.jboss.soa.esb.common;
 
 import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
 
 /**
  * This class represents the transaction strategy that is currently in force
@@ -128,23 +129,13 @@
      */
     public abstract void registerSynchronization (Synchronization sync) throws TransactionStrategyException;
     
-    public static void setStrategy (TransactionStrategy txSt)
-    {
-    	_currentStrategy.set(txSt);
-    }
+    /**
+     * Add a resource to the current transaction.
+     * @param resource
+     * @throws TransactionStrategyException
+     */
+    public abstract void enlistResource(XAResource resource) throws TransactionStrategyException;
     
-    public static TransactionStrategy getStrategy ()
-    {
-    	return _currentStrategy.get();
-    }
-    
-    public static void removeStrategy ()
-    {
-    	_currentStrategy.remove();
-    }
-    
-    private final static ThreadLocal<TransactionStrategy> _currentStrategy = new ThreadLocal<TransactionStrategy>();
-    
     /**
      * The null transaction strategy.
      * @author kevin
@@ -208,6 +199,7 @@
          */
         public void resume (Object tx) throws TransactionStrategyException
         {
+            throw new TransactionStrategyException("Unsupported in this transaction strategy") ;
         }
         
         /**
@@ -217,9 +209,20 @@
          */
         public void registerSynchronization (Synchronization sync) throws TransactionStrategyException
         {
+            throw new TransactionStrategyException("Unsupported in this transaction strategy") ;
         }
         
         /**
+         * Add a resource to the current transaction.
+         * @param resource
+         * @throws TransactionStrategyException
+         */
+        public void enlistResource(XAResource resource) throws TransactionStrategyException
+        {
+            throw new TransactionStrategyException("Unsupported in this transaction strategy") ;
+        }
+        
+        /**
          * Is the currently associated transaction active?
          * @return
          * @throws TransactionStrategyException

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -34,11 +34,7 @@
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
 import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -46,6 +42,7 @@
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -154,9 +151,12 @@
                     Object obj = _processMethod.invoke(_composer, new Object[] {msgIn});
                     // commit and acknowledge the reception of the message
                     // this is done after extracting the content of the JMS Message.
-                    if ( jmsSession != null && jmsSession.getTransacted() )
-                    	jmsSession.commit();
-                    	
+                    if (jmsSession.getTransacted()) {
+                        jmsSession.commit() ;
+                    } else {
+                        msgIn.acknowledge() ;
+                    }
+                    
                     if (null == obj) {
                         _logger.warn("Action class method <"
                                 + _processMethod.getName()
@@ -246,17 +246,28 @@
     
     private void rollbackJMSTransaction() 
     {
-        try
-		{
-			if ( jmsSession != null && jmsSession.getTransacted() )
-				jmsSession.rollback();
-		} catch (JMSException e) {
-			final String errorMsg = "JMSException during jmsSession.rollback()";
-			_logger.error( errorMsg, e );
-		}
+        try {
+            if (jmsSession.getTransacted()) {
+                jmsSession.rollback() ;
+            } else {
+                releaseSession() ;
+            }
+        } catch (final JMSException jmse) {
+            releaseSession() ;
+        }
     }
     
 
+    private void releaseSession() {
+        if (jmsSession != null) {
+            try {
+                jmsConnectionPool.releaseSession(jmsSession);
+            } finally {
+                jmsSession = null;
+            }
+        }
+    }
+
     private void cleanup() {
         try {
             if (_serviceName != null) {
@@ -409,7 +420,7 @@
             _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.ONE_ONE_PROTOCOL, destType,
                 jmsDestinationName, sFactClass, environment, _messageSelector, persistent, acknowledgeMode,
                 username, password, transacted );
-            jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment, sFactClass, destType,username, password, transacted);
+            jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment, sFactClass, username, password);
             	
             try {
                 jmsSession = _myEpr != null ? jmsConnectionPool.getSession(((JMSEpr)_myEpr).getAcknowledgeMode()):
@@ -429,7 +440,7 @@
                     jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
                 }
                 catch (NamingException ne) {
-                    if(jmsSession instanceof QueueSession) {
+                    if(JMSEpr.QUEUE_TYPE.equals(destType)) {
                         jmsDestination = jmsSession.createQueue(jmsDestinationName);
                     } else {
                         jmsDestination = jmsSession.createTopic(jmsDestinationName);
@@ -440,17 +451,7 @@
             NamingContextPool.releaseNamingContext(oJndiCtx) ;
         }
 
-        if(jmsSession instanceof QueueSession && jmsDestination instanceof Queue) {
-            jmsMessageConsumer = ((QueueSession)jmsSession).createReceiver((Queue)jmsDestination, _messageSelector);
-        } else if(jmsSession instanceof TopicSession && jmsDestination instanceof Topic) {
-            jmsMessageConsumer = ((TopicSession)jmsSession).createSubscriber((Topic)jmsDestination, _messageSelector, false);
-        } else {
-            try {
-                throw new ConfigurationException("The JMS destination identified by name '" + jmsDestinationName + "' must match it's configured destination type '" + jmsDestinationName + "'.");
-            } finally {
-                cleanup();
-            }
-        }
+        jmsMessageConsumer = jmsSession.createConsumer(jmsDestination, _messageSelector);
     } // ________________________________
 
     /**
@@ -501,7 +502,7 @@
 
     protected String jmsDestinationName;
 
-    protected Session jmsSession;
+    protected JmsSession jmsSession;
 
     protected Destination jmsDestination;
 

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -32,8 +32,6 @@
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
@@ -87,10 +85,6 @@
          * The error delay.
          */
         private long errorDelay ;
-        
-        private TransactionStrategy transactionStrategy;
-        private boolean transactional = false;
-        private boolean rollbackOnPipelineFaults = true;
 
         /**
 	 * public constructor
@@ -162,11 +156,6 @@
                     }
                 }
                 _latencySecs = lSeconds ;
-                
-                transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
-                transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
-                
-                rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
 	}
 
         /**
@@ -272,53 +261,17 @@
             }
         }
 
-        /**
-         * We have JMS transactional delivery/work semantics: before pulling a unit of work
-         * we start a transaction. If the pipeline completes successfully then we will
-         * commit that transaction and the OUW will be deleted. If we have to roll back
-         * the transaction then the UOW will be placed back on the input "queue" (assumes that
-         * the courier is transactional).
-         * 
-         * @param maxWaitMillis
-         */
 	public void waitForEventAndProcess (long maxWaitMillis)
 	{
 		Message message = null ;
-		boolean problem = false;
-		
 		try
 		{
-			transactionStrategy.begin();
-			
-			/*
-			 * If this is a transactional receive then the courier
-			 * needs to be reset afterwards, because we can only
-			 * guarantee one instance per transaction. If the courier
-			 * instance does some handy multiplexing internally across different
-			 * transactions then we won't be able to handle that at this level.
-			 * However, at the moment that isn't an issue.
-			 */
-			
-			TransactionStrategy.setStrategy(transactionStrategy);
-			
 			message = (maxWaitMillis > 0) ? _pickUpCourier
 					.pickup(maxWaitMillis) : null;
                         errorDelay = 0 ;
 		}
-		catch (TransactionStrategyException ex)
-		{
-			// could not begin transaction!
-			
-			_logger.error("Could not begin transaction!");
-			
-			problem = true;
-			
-			return;
-		}
 		catch (CourierTimeoutException e)
 		{
-			problem = true;
-			
 			return;
 		}
 		catch (FaultMessageException fme)
@@ -327,79 +280,35 @@
 		}
 		catch (CourierException e)
 		{
-			_logger.debug("Courier Exception", e);
-			if (errorDelay == 0)
-			{
-				errorDelay = MIN_ERROR_DELAY ;
-			}
-			else if (errorDelay < MAX_ERROR_DELAY)
-			{
-				errorDelay <<= 1 ;
-			}
-			e.printStackTrace();
-			_logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
-			waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
-
-			problem = true;
-			
+                        _logger.debug("Courier Exception", e);
+                        if (errorDelay == 0)
+                        {
+                            errorDelay = MIN_ERROR_DELAY ;
+                        }
+                        else if (errorDelay < MAX_ERROR_DELAY)
+                        {
+                            errorDelay <<= 1 ;
+                        }
+                        _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
+                        waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
 			return;
 		}
-		finally
-		{
-			if (problem)
-			{
-				try
-				{
-					if (transactionStrategy.getTransaction() != null)
-					{
-						CourierUtil.cleanCourier(_pickUpCourier);
-					
-						resetCourier();
-					}
-				}
-				catch (Throwable ex)
-				{
-					CourierUtil.cleanCourier(_pickUpCourier);
-					
-					resetCourier(); // to be on the safe side!
-				}
-			
-				rollbackTransaction();
-			}
-			
-			TransactionStrategy.removeStrategy();
-		}
 
 		if (null != message)
 		{
-			try
-			{
-				final Message pipelineMessage = message ;
-				final Object txHandle = transactionStrategy.suspend();
-				final TransactionalRunner txRunner = new TransactionalRunner(_pickUpCourier, pipelineMessage, txHandle);
-				
-				updateThreadCount(+1);
-				_execService.execute(txRunner);
-				
-				if (transactional)
-				{
-					_pickUpCourier = null;  // runner will clean it up.
-					
-					resetCourier();  // we need another courier for the next msg.
-				}
-			}
-			catch (TransactionStrategyException ex)
-			{
-				_logger.warn("Caught transaction related exception: ", ex);
-				rollbackTransaction();
-			}
+                    final Message pipelineMessage = message ;
+                    final Runnable pipelineRunner = new Runnable() {
+                        public void run() {
+                            try {
+                                pipeline.process(pipelineMessage) ;
+                            } finally {
+                                updateThreadCount(-1) ;
+                            }
+                        }
+                    } ;
+                    updateThreadCount(+1);
+                    _execService.execute(pipelineRunner);
 		}
-		else
-		{
-			// nothing to do, so roll back the transaction before returning.
-			
-			rollbackTransaction();
-		}
 
 	} // ________________________________
 
@@ -483,134 +392,7 @@
                 }
             }
         }
-        
-        private void rollbackTransaction ()
-        {
-        	try
-        	{
-        		transactionStrategy.rollbackOnly();
-        		transactionStrategy.terminate();
-        	}
-        	catch (Throwable ex)
-        	{
-        		_logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
-        	}
-        }
-        
-        private void resetCourier ()
-        {
-        	TwoWayCourier pickUpCourier = null;
 
-            try
-            {
-                pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
-                try
-                {
-                    final Method setPollLatency = pickUpCourier.getClass().getMethod(
-                        "setPollLatency", new Class[] { Long.class });
-                    setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
-                }
-                catch (final NoSuchMethodException nsme)
-                {
-                        // OK, just leave it null
-                }
-                catch (final Exception ex)
-                {
-                    CourierUtil.cleanCourier(pickUpCourier);
-                    
-                    _logger.error("Problems invoking setPollLatency(long)", ex);
-                }
-            }
-            catch (final MalformedEPRException mepre)
-            {
-                _logger.error("Malformed EPR: " + _epr) ;
-            }
-            catch (final CourierException ce)
-            {
-                _logger.error("No appropriate courier can be obtained for " + _epr, ce);
-            }
-
-            _pickUpCourier = pickUpCourier;
-        }
-
-        class TransactionalRunner implements Runnable
-        {
-        	public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
-        	{
-        		_courier = courier;
-        		_pipelineMessage = pipelineMessage;
-        		_txHandle = txHandle;
-        	}
-        	
-        	public void run()
-        	{
-        		boolean problem = false;
-        		
-        		try
-        		{
-        			transactionStrategy.resume(_txHandle);
-        			
-        			pipeline.setTransactional(transactional);
-        			
-        			TransactionStrategy.setStrategy(transactionStrategy);
-        			
-        			/*
-        			 * Current strategy is to commit as long as process returns true.
-        			 * If fails, or any exceptions are caught, then we roll back.
-        			 * 
-        			 * TODO re-examine the semantics around true/false from the pipeline.
-        			 */
-        			
-        			// TODO consider adding a RollbackOnFalse option to allow override.
-        			
-        			problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
-
-        			if (!problem)
-        			{
-        				transactionStrategy.terminate();
-        			}
-        		}
-        		catch (TransactionStrategyException ex)
-        		{
-        			problem = true;
-        			
-        			_logger.warn("TransactionalRunner caught transaction exception: ", ex);
-        		}
-        		catch (RuntimeException ex)
-        		{
-        			problem = true;
-        			
-        			throw ex;
-        		}
-        		catch (Throwable ex)
-        		{
-        			problem = true;
-        			
-        			_logger.warn("TransactionalRunner caught throwable: ",ex);
-        		}
-        		finally
-        		{
-        			if (problem)
-        			{
-        				rollbackTransaction();
-        			}
-        				
-        			if (transactional)
-        			{
-        				CourierUtil.cleanCourier(_courier);
-        			}
-        			
-        			TransactionStrategy.removeStrategy();
-        			
-        			updateThreadCount(-1);
-        		}
-        	}
-        	
-        	private PickUpOnlyCourier _courier;
-        	private Message _pipelineMessage;
-        	private Object _txHandle;
-        };
-        
         private ConfigTree _config;
 
         private String _eprCategoryName;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -88,18 +88,6 @@
 	protected abstract void send(final Message p_oMsg, MessageProducer msgProducer ) throws JMSException;
 	
 	/**
-	 * Get a session specific to the subclasses implementation 
-	 * i.e Queue or Topic.
-	 * 
-	 * @param pool	
-	 * @return Session	
-	 * @throws NamingException
-	 * @throws JMSException
-	 * @throws ConnectionException
-	 */
-	protected abstract Session getSession( JmsConnectionPool pool ) throws NamingException, JMSException, ConnectionException;
-	
-	/**
 	 * Creates a message producer specific to the subclasses implementation
 	 * i.e QueueSender or TopicPublisher.
 	 * 
@@ -263,7 +251,7 @@
 
 		setJMSProperties( esbMessage, jmsMessage );
 
-		for (Iterator II = m_oProps.keySet().iterator(); II.hasNext();)
+		for (Iterator<Object> II = m_oProps.keySet().iterator(); II.hasNext();)
 		{
 		    String sKey = (String) II.next();
 		    String sVal = m_oProps.getProperty(sKey);
@@ -337,12 +325,11 @@
 	 * Will setup/create JMS connections, sessions, producers.
 	 * 
 	 * @param configTrees
-	 * @param destinationType
 	 * @throws ConfigurationException
 	 * @throws JMSException
 	 * @throws ConnectionException
 	 */
-	protected void setUpProducers (final ConfigTree[] configTrees, final String destinationType) throws ConfigurationException, JMSException, ConnectionException
+	protected void setUpProducers (final ConfigTree[] configTrees) throws ConfigurationException, JMSException, ConnectionException
 	{
 		// REVIEW: The connection factory name is hardcoded and is the same as
 		// that of the queue connection factory.
@@ -379,9 +366,9 @@
                 
                 String connectionFactory = configTrees[i].getAttribute(JMSEpr.CONNECTION_FACTORY_TAG, CONNECTION_FACTORY);
                 
-                connectionPools[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory, destinationType);
-                sessions[i] = getSession( connectionPools[i] );
-				producers[i] = createProducer( connectionPools[i], sAtt, sessions[i], environment );
+                connectionPools[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory);
+                sessions[i] = connectionPools[i].getSession() ;
+                producers[i] = createProducer( connectionPools[i], sAtt, sessions[i], environment );
                 
                 final String persistentStr = configTrees[i].getAttribute( PERSISTENT_ATTR, "true" );
                 deliveryModes[i] = persistentStr.equalsIgnoreCase( "true" ) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -24,12 +24,10 @@
 
 import java.util.Properties;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.naming.Context;
 import javax.naming.NamingException;
@@ -38,7 +36,6 @@
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.helpers.NamingContextException;
 import org.jboss.soa.esb.helpers.NamingContextPool;
@@ -57,7 +54,6 @@
  */
 public class NotifyQueues extends NotifyJMS
 {
-	@SuppressWarnings("unused")
 	private Logger log = Logger.getLogger( NotifyQueues.class );
 	
 	/**
@@ -72,64 +68,50 @@
     public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
 	{
 		super(p_oP);
-		setQueues(p_oP.getChildren(CHILD_QUEUE));
+		setUpProducers(p_oP.getChildren(CHILD_QUEUE));
 	} // __________________________________
-
-	protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
-	{
-		setUpProducers( p_oaP, JMSEpr.QUEUE_TYPE );
-	}
 	
 	protected void send (final Message p_oMsg, MessageProducer msgProducer ) throws JMSException
 	{
-		QueueSender oCurr = (QueueSender) msgProducer;
 		if ( log.isDebugEnabled() )
 		{
 			log.debug( "Sending to queue with DeliveryMode : " + msgProducer.getDeliveryMode());
 			log.debug( "Sending to queue with Priority : " + msgProducer.getPriority());
 			log.debug( "Sending to queue with TTL : " + msgProducer.getTimeToLive());
 		}
-		oCurr.send(p_oMsg);
+		msgProducer.send(p_oMsg);
 	}
 	
-	protected QueueSession getSession(final JmsConnectionPool pool) throws NamingException, JMSException, ConnectionException
-	{
-        return pool.getQueueSession();
-	}
-	
 	protected MessageProducer createProducer(
 			final JmsConnectionPool pool, 
 			final String destinationName, 
 			final Session session,
 			final Properties environment) throws NamingException, JMSException, ConnectionException
 	{
-		
-		QueueSession queueSession = (QueueSession) session;
-		
 	try
 	{
             Context context = NamingContextPool.getNamingContext(environment);
             try
             {
-                Queue queue=null;
+                Destination destination=null;
                 try 
                 {
-                    queue = (Queue) context.lookup(destinationName);
+                    destination = (Destination) context.lookup(destinationName);
                 } 
                 catch (NamingException ne) 
                 {
                     context = NamingContextPool.replaceNamingContext(context, environment);
                     try 
                     {
-                        queue = (Queue) context.lookup(destinationName);
+                        destination = (Destination) context.lookup(destinationName);
                     } 
                     catch (NamingException nex) 
                     {
                         //ActiveMQ
-                        queueSession.createTopic(destinationName);
+                        session.createTopic(destinationName);
                     }
                 }
-    		return queueSession.createSender(queue);
+    		return session.createProducer(destination);
             }
             finally
             {

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -24,20 +24,17 @@
 
 import java.util.Properties;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
 import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.helpers.NamingContextException;
 import org.jboss.soa.esb.helpers.NamingContextPool;
@@ -56,58 +53,44 @@
 	{
 		super(p_oP);
 
-		setTopics(p_oP.getChildren(CHILD_TOPIC));
+		setUpProducers(p_oP.getChildren(CHILD_TOPIC));
 	} // __________________________________
 
-	protected void setTopics (ConfigTree[] configTrees) throws ConfigurationException, JMSException, ConnectionException
-	{
-		setUpProducers( configTrees, JMSEpr.TOPIC_TYPE );
-	}
-
 	protected void send (final Message p_oMsg, MessageProducer msgProducer ) throws JMSException
 	{
-		TopicPublisher oCurr = (TopicPublisher) msgProducer;
-		oCurr.publish(p_oMsg);
+		msgProducer.send(p_oMsg);
 	}
 	
-	protected TopicSession getSession(final JmsConnectionPool pool) throws NamingException, JMSException, ConnectionException
-	{
-        return pool.getTopicSession();
-	}
-	
 	protected MessageProducer createProducer(
 			final JmsConnectionPool pool, 
 			final String destinationName, 
 			final Session session,
 			final Properties environment) throws NamingException, JMSException, ConnectionException
 	{
-		
-		TopicSession topicSession = (TopicSession) session;
-		
 	try
 	{
             Context context = NamingContextPool.getNamingContext(environment);
             try
             {
-                Topic topic=null;
+                Destination destination=null;
                 try 
                 {
-                    topic = (Topic) context.lookup(destinationName);
+                    destination = (Destination) context.lookup(destinationName);
                 } 
                 catch (NamingException ne) 
                 {
                     context = NamingContextPool.replaceNamingContext(context, environment);
                     try 
                     {
-                        topic = (Topic) context.lookup(destinationName);
+                        destination = (Destination) context.lookup(destinationName);
                     } 
                     catch (NamingException nex) 
                     {
                         //ActiveMQ
-                        topicSession.createTopic(destinationName);
+                        session.createTopic(destinationName);
                     }
                 }
-    		return topicSession.createPublisher(topic);
+    		return session.createProducer(destination);
             }
             finally
             {

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -136,6 +136,7 @@
 
         // this is just to make sure they're unique - i.e. so as 1+
         // "things" can listen to the same schedule...
+        // This is not thread safe!
         name += ("-" + nameDelta++);
         trigger.setName(name);
 

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -34,7 +34,6 @@
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -54,7 +53,7 @@
 	public void getJmsSession() throws CourierException, JMSException
 	{
 		Session jmsSession = jmsCourier.getJmsSession();
-		assertTrue( jmsSession.getTransacted() );
+		assertFalse( jmsSession.getTransacted() );
 		jmsCourier.cleanup();
 	}
 	

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -48,10 +48,9 @@
 	public void createPoolKey_null_environment()
 	{
 		final Properties env = null;
-		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE );
+		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory);
 		
 		assertEquals( connectionFactory, poolKey.get( CONNECTION_FACTORY_TAG ) );
-		assertEquals( QUEUE_TYPE, poolKey.get( DESTINATION_TYPE_TAG ) );
 	}
 	
 	@Test
@@ -62,7 +61,7 @@
 		final Properties env = new Properties();
 		env.put( Context.SECURITY_PRINCIPAL, username );
 		env.put( Context.SECURITY_CREDENTIALS, password );
-		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE);
+		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory);
 		assertEquals( username, poolKey.get( Context.SECURITY_PRINCIPAL ) );
 		assertEquals( password, poolKey.get( Context.SECURITY_CREDENTIALS ) );
 	}
@@ -73,32 +72,12 @@
 		final String username = "daniel";
 		final String password = "passwd";
 		final Properties env = null;
-		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE,
-				username, password, false);
+		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory,
+				username, password);
 		assertEquals( username, poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG ) );
 		assertEquals( password, poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG ) );
 	}
 	
-	@Test
-	public void createPoolKey_with_transacted_properties()
-	{
-		final boolean transacted = true;
-		final Properties env = null;
-		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE,
-				null, null, transacted);
-		assertEquals( transacted, Boolean.valueOf ( poolKey.get( JMSEpr.TRANSACTED_TAG ) ) );
-	}
-	
-	@Test
-	public void createPoolKey_with_non_transacted_properties()
-	{
-		final boolean transacted = false;
-		final Properties env = null;
-		Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE,
-				null, null, transacted);
-		assertEquals( transacted, Boolean.valueOf ( poolKey.get( JMSEpr.TRANSACTED_TAG ) ) );
-	}
-	
 	public static junit.framework.Test suite()
 	{
 		return new JUnit4TestAdapter( JmsConnectionPoolContainerUnitTest.class );

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -131,14 +131,6 @@
 		{
 			return null;
 		}
-
-		@Override
-		protected Session getSession( JmsConnectionPool pool )
-				throws NamingException, JMSException, ConnectionException
-		{
-			return null;
-		}
-		
 	}
 	
 	private static class MockJMSSession implements Session

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -36,6 +36,7 @@
 import javax.jms.Message;
 import javax.jms.ObjectMessage;
 import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
 import javax.jms.TextMessage;
 import javax.naming.Context;
 import javax.naming.NamingException;
@@ -265,10 +266,44 @@
             }
             else
             {
-                return method.invoke(queueConnection, args) ;
+                final Object response = method.invoke(queueConnection, args) ;
+                if (response instanceof QueueSession)
+                {
+                    final QueueSession queueSession = (QueueSession)response ;
+                    return (QueueSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueSession.class},
+                            new MockQueueSessionInvocationHandler(queueSession)) ;
+                }
+                else
+                {
+                    return response ;
+                }
             }
         }
     }
+    
+    private static final class MockQueueSessionInvocationHandler implements InvocationHandler
+    {
+        private final QueueSession queueSession ;
+            
+        MockQueueSessionInvocationHandler(final QueueSession queueSession)
+        {
+            this.queueSession = queueSession ;
+        }
+            
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("recover".equals(methodName))
+            {
+                return null ;
+            }
+            else
+            {
+                return method.invoke(queueSession, args) ;
+            }
+        }
+    }
         
     public static junit.framework.Test suite()
 	{

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -33,6 +33,7 @@
 import javax.jms.ObjectMessage;
 import javax.jms.TextMessage;
 import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -217,8 +218,42 @@
                 }
                 else
                 {
-                    return method.invoke(topicConnection, args) ;
+                final Object response = method.invoke(topicConnection, args) ;
+                if (response instanceof TopicSession)
+                {
+                    final TopicSession topicSession = (TopicSession)response ;
+                    return (TopicSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSession.class},
+                            new MockTopicSessionInvocationHandler(topicSession)) ;
                 }
+                else
+                {
+                    return response ;
+                }
             }
         }
+    }
+    
+    private static final class MockTopicSessionInvocationHandler implements InvocationHandler
+    {
+        private final TopicSession topicSession ;
+        
+        MockTopicSessionInvocationHandler(final TopicSession topicSession)
+        {
+            this.topicSession = topicSession ;
+        }
+        
+        public Object invoke(final Object proxy, final Method method, final Object[] args)
+            throws Throwable
+        {
+            final String methodName = method.getName() ;
+            if ("recover".equals(methodName))
+            {
+                return null ;
+            }
+            else
+            {
+                return method.invoke(topicSession, args) ;
+            }
+        }
+    }
 }

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -32,7 +32,7 @@
 
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
-import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
 import org.jboss.soa.esb.common.Environment;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,14 +57,14 @@
     {
         JmsConnectionPool jmsConnectionPool = null;
         
-        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory", JMSEpr.QUEUE_TYPE);
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         //Open 3 concurrent sessions
-        Session session1 = jmsConnectionPool.getQueueSession();
+        JmsSession session1 = jmsConnectionPool.getSession();
         assertEquals(1, jmsConnectionPool.getSessionsInPool());
-        Session session2 = jmsConnectionPool.getQueueSession();
+        JmsSession session2 = jmsConnectionPool.getSession();
         assertEquals(2, jmsConnectionPool.getSessionsInPool());
-        Session session3 = jmsConnectionPool.getQueueSession();
+        JmsSession session3 = jmsConnectionPool.getSession();
         assertEquals(3, jmsConnectionPool.getSessionsInPool());
         //Close them
         jmsConnectionPool.closeSession(session1);
@@ -76,8 +76,8 @@
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
         
-        //Use it again and add one session
-        jmsConnectionPool.getQueueSession();
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+        jmsConnectionPool.getSession();
         assertEquals(1, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
         //I should be able to remove the entire pool and have it do closing
@@ -88,61 +88,55 @@
     @Test
     public void testCreateSecondPool() throws Exception
     {
-        JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
-        jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+        JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
+        jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
         //This should be the same pool
         assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
     
-        JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
+        JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(null, "ConnectionFactory");
         //This should be a different pool, so now we should have 2.
         assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
         
-        JmsConnectionPool jmsConnectionPool3 = JmsConnectionPoolContainer.getPool(null, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
-        //This should be a different pool, so now we should have 3.
-        assertEquals(3, JmsConnectionPoolContainer.getNumberOfPools());
-        
         //Now lets cleanup after ourselves
-        jmsConnectionPool3.removeSessionPool();
-        assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
+        jmsConnectionPool2.removeSessionPool();
+        assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
         
         jmsConnectionPool1.removeSessionPool();
-        jmsConnectionPool2.removeSessionPool();
-        jmsConnectionPool3.removeSessionPool();
         assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools()); 
     }
     
     @Test
     public void testPoolAndSessionsWithAcknowledgeMode()  throws Exception
     {
-        JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory", JMSEpr.QUEUE_TYPE);
+        JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         
-        Session autoAckSession1 = jmsConnectionPool.getQueueSession(Session.AUTO_ACKNOWLEDGE);
+        JmsSession autoAckSession1 = jmsConnectionPool.getSession(Session.AUTO_ACKNOWLEDGE);
         assertEquals(Session.AUTO_ACKNOWLEDGE, autoAckSession1.getAcknowledgeMode());
         assertEquals(1, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, jmsConnectionPool.getSessionsInPool(Session.AUTO_ACKNOWLEDGE));
         
-        Session autoAckSession2 = jmsConnectionPool.getQueueSession(Session.AUTO_ACKNOWLEDGE);
+        JmsSession autoAckSession2 = jmsConnectionPool.getSession(Session.AUTO_ACKNOWLEDGE);
         assertEquals(Session.AUTO_ACKNOWLEDGE, autoAckSession2.getAcknowledgeMode());
         assertEquals(2, jmsConnectionPool.getSessionsInPool());
         assertEquals(2, jmsConnectionPool.getSessionsInPool(Session.AUTO_ACKNOWLEDGE));
         
-        Session clientAckSession1 = jmsConnectionPool.getQueueSession(Session.CLIENT_ACKNOWLEDGE);
+        JmsSession clientAckSession1 = jmsConnectionPool.getSession(Session.CLIENT_ACKNOWLEDGE);
         assertEquals(Session.CLIENT_ACKNOWLEDGE, clientAckSession1.getAcknowledgeMode());
         assertEquals(3, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, jmsConnectionPool.getSessionsInPool(Session.CLIENT_ACKNOWLEDGE));
         
-        Session clientAckSession2 = jmsConnectionPool.getQueueSession(Session.CLIENT_ACKNOWLEDGE);
+        JmsSession clientAckSession2 = jmsConnectionPool.getSession(Session.CLIENT_ACKNOWLEDGE);
         assertEquals(Session.CLIENT_ACKNOWLEDGE, clientAckSession2.getAcknowledgeMode());
         assertEquals(4, jmsConnectionPool.getSessionsInPool());
         assertEquals(2, jmsConnectionPool.getSessionsInPool(Session.CLIENT_ACKNOWLEDGE));
         
-        Session dupsOkAcSession1 = jmsConnectionPool.getQueueSession(Session.DUPS_OK_ACKNOWLEDGE);
+        JmsSession dupsOkAcSession1 = jmsConnectionPool.getSession(Session.DUPS_OK_ACKNOWLEDGE);
         assertEquals(Session.DUPS_OK_ACKNOWLEDGE, dupsOkAcSession1.getAcknowledgeMode());
         assertEquals(5, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, jmsConnectionPool.getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE));
         
-        Session dupsOkAcSession2 = jmsConnectionPool.getQueueSession(Session.DUPS_OK_ACKNOWLEDGE);
+        JmsSession dupsOkAcSession2 = jmsConnectionPool.getSession(Session.DUPS_OK_ACKNOWLEDGE);
         assertEquals(Session.DUPS_OK_ACKNOWLEDGE, dupsOkAcSession2.getAcknowledgeMode());
         assertEquals(6, jmsConnectionPool.getSessionsInPool());
         assertEquals(2, jmsConnectionPool.getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE));
@@ -182,7 +176,8 @@
         assertEquals(0, jmsConnectionPool.getSessionsInPool());
         assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
         
-        jmsConnectionPool.getQueueSession();
+        jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+        jmsConnectionPool.getSession();
         assertEquals(1, jmsConnectionPool.getSessionsInPool());
         assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
         

Modified: labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
===================================================================
--- labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -162,7 +162,7 @@
 	 * @param f_sb statistics bean
 	 */
 	public void insertStatistics(StatisticsBean f_sb) {
-		TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+		TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
 		Object txHandle = null;
 		
 		if (txStrategy != null)

Modified: labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
===================================================================
--- labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -67,7 +67,7 @@
 	 * @see org.jboss.soa.esb.monitoring.server.Filer#persistData()
 	 */
 	public void persistData() {
-		TransactionStrategy txS = TransactionStrategy.getStrategy();
+		TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
 		Object txHandle = null;
 		
 		if (txS != null)

Modified: labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
===================================================================
--- labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java	2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java	2008-03-21 12:27:47 UTC (rev 19172)
@@ -95,7 +95,7 @@
 	public void insertOperations(ServiceControlCommand f_ob) {
 		Session sess = null;
 		Transaction tx = null;
-		TransactionStrategy txS = TransactionStrategy.getStrategy();
+		TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
 		Object txHandle = null;
 		
 		if (txS != null)
@@ -158,7 +158,7 @@
 	public void updateActiveFlag(String serverName) {
 		Session sess = null;
 		Transaction tx = null;
-		TransactionStrategy txS = TransactionStrategy.getStrategy();
+		TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
 		Object txHandle = null;
 		
 		if (txS != null)




More information about the jboss-svn-commits mailing list