[jboss-svn-commits] JBL Code SVN: r18218 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta: src/org/jboss/internal/soa/esb/rosetta/pooling and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Jan 31 07:49:43 EST 2008


Author: mark.little at jboss.com
Date: 2008-01-31 07:49:43 -0500 (Thu, 31 Jan 2008)
New Revision: 18218

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1438

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2008-01-31 12:49:43 UTC (rev 18218)
@@ -49,6 +49,9 @@
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.common.JBossESBPropertyService.JTATransactionStrategy;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierTimeoutException;
 import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -142,6 +145,34 @@
     public Session getJmsSession(final int acknowledgeMode) throws CourierException {
         if(jmsSession == null) {
             synchronized(this) {
+        	try
+        	{
+                	TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+        		Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+        		boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+        		
+        		transactional = (txHandle != null);
+        		
+        		/*
+        		 * Make sure the current transaction is still active! If we
+        		 * have previously slept, then the timeout may be longer than that
+        		 * associated with the transaction.
+        		 */
+        		
+        		/*
+        		 * MessageAwareListener will catch exceptions and roll back the transaction.
+        		 */
+        		
+        		if (transactional && !isActive)
+        		{
+        			throw new CourierException("Associated transaction is no longer active!");
+        		}
+        	}
+        	catch (TransactionStrategyException ex)
+        	{
+        	    throw new CourierException(ex);
+        	}
+        	
                 if(jmsSession == null) {
                     String sType;
 
@@ -185,7 +216,7 @@
         if (null == message) {
             return false;
         }
-
+        
         if (_messageProducer == null) {
             try {
                 createMessageProducer();
@@ -232,6 +263,34 @@
             return false;
         }
 
+        try
+	{
+        	TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+		Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+		boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+		
+		transactional = (txHandle != null);
+		
+		/*
+		 * Make sure the current transaction is still active! If we
+		 * have previously slept, then the timeout may be longer than that
+		 * associated with the transaction.
+		 */
+		
+		/*
+		 * MessageAwareListener will catch exceptions and roll back the transaction.
+		 */
+		
+		if (transactional && !isActive)
+		{
+			throw new CourierException("Associated transaction is no longer active!");
+		}
+	}
+	catch (TransactionStrategyException ex)
+	{
+	    throw new CourierException(ex);
+	}
+        
         if (_messageProducer == null) {
             try {
                 createMessageProducer();
@@ -248,9 +307,11 @@
                         message.setStringProperty(key, kvp.getValue());
                     }
                 }
+                
                 sendMessage(message);
-                if ( jmsSession.getTransacted() )
+                if ( jmsSession.getTransacted() && !transactional )
 	                jmsSession.commit();
+                
                 return true;
             }
             catch (JMSException e) {
@@ -375,7 +436,14 @@
     } // ________________________________
 
     private JmsConnectionPool getConnectionPool() throws ConnectionException {
-        if(jmsConnectionPool == null) {
+	/*
+	 * We need to ignore the pool if we are being used within the scope of
+	 * a global transaction. However, the pool conveniently wraps the JNDI
+	 * lookup and configuration. Rather than refactor that out, for now we
+	 * "work around" it.
+	 */
+	
+        if ((jmsConnectionPool == null) || (transactional)) {
             synchronized(this) {
                 if(jmsConnectionPool == null) {
                     String sFactoryClass;
@@ -400,7 +468,11 @@
                         sFactoryClass = "ConnectionFactory";
                     }
 
-                    jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, transacted);
+                    /*
+                     * Needs to be a one-shot instance if transactional.
+                     */
+                    
+                    jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, transacted, transactional);
                 }
             }
         }
@@ -549,7 +621,6 @@
         }
     } // ________________________________
 
-
     long _sleepForRetries = 3000; // milliseconds
 
     protected boolean _isReceiver;
@@ -570,6 +641,8 @@
 
     protected volatile JmsConnectionPool jmsConnectionPool;
 
+    private boolean transactional = false;
+    
     /**
      * Strategy for setting JMSProperties
      */

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2008-01-31 12:49:43 UTC (rev 18218)
@@ -44,6 +44,8 @@
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
 import org.jboss.soa.esb.helpers.NamingContextException;
 import org.jboss.soa.esb.helpers.NamingContextPool;
 
@@ -123,7 +125,7 @@
     throws NamingException, JMSException, ConnectionException, NamingContextException
     {
         String destinationType = poolKey.get(JMSEpr.DESTINATION_TYPE_TAG);
-        
+
         //Setup a connection if we don't have one
         if (jmsConnection==null) {
             JmsConnectionPoolContainer.addToPool(poolKey, this);
@@ -134,6 +136,25 @@
                 String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
                 Object factoryConnection=null;
                 try {
+                    /*
+                     * For XA connections, use java:/JmsXA which should return a wrapped
+                     * XAConnectionFactory instance, which will be the base for the
+                     * XAQueueConnectionFactory or XATopicConnectionFactory.
+                     */
+                    
+                    try
+                    {
+                	if (TransactionStrategy.getTransactionStrategy(true).getTransaction() != null)
+                	{
+                	    if ("XAConnectionFactory".equals(connectionFactoryString))
+                        	connectionFactoryString = "java:/JmsXA";
+                	}
+                    }
+                    catch (TransactionStrategyException ex)
+                    {
+                	logger.warn("Could not determine transaction strategy!", ex);
+                    }
+
                     factoryConnection = jndiContext.lookup(connectionFactoryString);
                 } catch (NamingException ne) {
                     logger.info("Received NamingException, refreshing context.");
@@ -153,12 +174,29 @@
                     jmsConnection = useJMSSecurity ? factory.createTopicConnection(username,password): factory.createTopicConnection();
                 }
                 
-                jmsConnection.setExceptionListener(new ExceptionListener() {
-                    public void onException(JMSException arg0)
+                TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+                
+                /*
+                 * It is illegal to set ExceptionListeners if we are within a transaction.
+                 */
+                
+                try
+                {
+                    if ((txStrategy != null) && (txStrategy.getTransaction() == null))
                     {
-                        removeSessionPool() ;
+    		                jmsConnection.setExceptionListener(new ExceptionListener() {
+                            public void onException(JMSException arg0)
+                            {
+                                removeSessionPool() ;
+                            }
+                        }) ;
                     }
-                }) ;
+                }
+                catch (TransactionStrategyException ex)
+                {
+                    throw new ConnectionException(ex);
+                }
+                
                 jmsConnection.start();
             } finally {
                 NamingContextPool.releaseNamingContext(jndiContext) ;
@@ -168,7 +206,7 @@
         
         //Create a new Session
         ArrayList<Session> freeSessions = freeSessionsMap.get( acknowledgeMode );
-        
+
         if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
             QueueSession session = ((QueueConnection)jmsConnection).createQueueSession(transacted,acknowledgeMode);
                     

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java	2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java	2008-01-31 12:49:43 UTC (rev 18218)
@@ -94,6 +94,14 @@
     	return getPool( enviroment, connectionFactory, destinationType, username, password, false );
     	
     }
+    
+    public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
+		final String username, final String password, final boolean transacted)
+    throws ConnectionException
+    {
+	return getPool(enviroment, connectionFactory, destinationType, username, password, transacted, false);
+    }
+    
     /**
      * Returns the pool given the identifiers for the JMS provider.
      * 
@@ -107,16 +115,31 @@
      * @throws ConnectionException
      */
     public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
-    		final String username, final String password, final boolean transacted)
+    		final String username, final String password, final boolean transacted, final boolean ignore)
         throws ConnectionException
     {
         Map<String,String> poolKey = createPoolKey(enviroment, connectionFactory, destinationType, username, password, transacted);
         final Map<Map<String, String>, JmsConnectionPool> poolMap = getMap() ;
-        if (poolMap.containsKey(poolKey)) {
+  
+        if (poolMap.containsKey(poolKey) && !ignore) {
             return poolMap.get(poolKey);
         } else {
             JmsConnectionPool pool = new JmsConnectionPool(poolKey);
-            poolMap.put(poolKey, pool);
+            
+            /*
+             * Horrible, but means we don't have to rewrite the pool code at this stage!
+             */
+            
+            if (!ignore)
+            {
+        	/*
+        	 * Put it into the pool unless this is a one-shot connection, e.g., for
+        	 * use in a global transaction.
+        	 */
+        	
+        	poolMap.put(poolKey, pool);
+            }
+            
             return pool;
         }
     }

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java	2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java	2008-01-31 12:49:43 UTC (rev 18218)
@@ -38,7 +38,6 @@
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycle;
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleController;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.body.content.BytesBody;
 import org.jboss.soa.esb.message.format.MessageFactory;
 
 public class ListenerManagerJDBCUnitTest extends ListenerManagerFileUnitTest
@@ -54,7 +53,6 @@
 
 		clearMessages() ;
 		
-
 		try
 		{
 			Statement stmt = getDbConnection().createStatement();




More information about the jboss-svn-commits mailing list