[jboss-svn-commits] JBL Code SVN: r18218 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta: src/org/jboss/internal/soa/esb/rosetta/pooling and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Jan 31 07:49:43 EST 2008
Author: mark.little at jboss.com
Date: 2008-01-31 07:49:43 -0500 (Thu, 31 Jan 2008)
New Revision: 18218
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1438
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-01-31 12:49:43 UTC (rev 18218)
@@ -49,6 +49,9 @@
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+import org.jboss.soa.esb.common.JBossESBPropertyService.JTATransactionStrategy;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -142,6 +145,34 @@
public Session getJmsSession(final int acknowledgeMode) throws CourierException {
if(jmsSession == null) {
synchronized(this) {
+ try
+ {
+ TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+ boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+
+ transactional = (txHandle != null);
+
+ /*
+ * Make sure the current transaction is still active! If we
+ * have previously slept, then the timeout may be longer than that
+ * associated with the transaction.
+ */
+
+ /*
+ * MessageAwareListener will catch exceptions and roll back the transaction.
+ */
+
+ if (transactional && !isActive)
+ {
+ throw new CourierException("Associated transaction is no longer active!");
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ throw new CourierException(ex);
+ }
+
if(jmsSession == null) {
String sType;
@@ -185,7 +216,7 @@
if (null == message) {
return false;
}
-
+
if (_messageProducer == null) {
try {
createMessageProducer();
@@ -232,6 +263,34 @@
return false;
}
+ try
+ {
+ TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+ boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+
+ transactional = (txHandle != null);
+
+ /*
+ * Make sure the current transaction is still active! If we
+ * have previously slept, then the timeout may be longer than that
+ * associated with the transaction.
+ */
+
+ /*
+ * MessageAwareListener will catch exceptions and roll back the transaction.
+ */
+
+ if (transactional && !isActive)
+ {
+ throw new CourierException("Associated transaction is no longer active!");
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ throw new CourierException(ex);
+ }
+
if (_messageProducer == null) {
try {
createMessageProducer();
@@ -248,9 +307,11 @@
message.setStringProperty(key, kvp.getValue());
}
}
+
sendMessage(message);
- if ( jmsSession.getTransacted() )
+ if ( jmsSession.getTransacted() && !transactional )
jmsSession.commit();
+
return true;
}
catch (JMSException e) {
@@ -375,7 +436,14 @@
} // ________________________________
private JmsConnectionPool getConnectionPool() throws ConnectionException {
- if(jmsConnectionPool == null) {
+ /*
+ * We need to ignore the pool if we are being used within the scope of
+ * a global transaction. However, the pool conveniently wraps the JNDI
+ * lookup and configuration. Rather than refactor that out, for now we
+ * "work around" it.
+ */
+
+ if ((jmsConnectionPool == null) || (transactional)) {
synchronized(this) {
if(jmsConnectionPool == null) {
String sFactoryClass;
@@ -400,7 +468,11 @@
sFactoryClass = "ConnectionFactory";
}
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, transacted);
+ /*
+ * Needs to be a one-shot instance if transactional.
+ */
+
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, transacted, transactional);
}
}
}
@@ -549,7 +621,6 @@
}
} // ________________________________
-
long _sleepForRetries = 3000; // milliseconds
protected boolean _isReceiver;
@@ -570,6 +641,8 @@
protected volatile JmsConnectionPool jmsConnectionPool;
+ private boolean transactional = false;
+
/**
* Strategy for setting JMSProperties
*/
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-01-31 12:49:43 UTC (rev 18218)
@@ -44,6 +44,8 @@
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.helpers.NamingContextException;
import org.jboss.soa.esb.helpers.NamingContextPool;
@@ -123,7 +125,7 @@
throws NamingException, JMSException, ConnectionException, NamingContextException
{
String destinationType = poolKey.get(JMSEpr.DESTINATION_TYPE_TAG);
-
+
//Setup a connection if we don't have one
if (jmsConnection==null) {
JmsConnectionPoolContainer.addToPool(poolKey, this);
@@ -134,6 +136,25 @@
String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
Object factoryConnection=null;
try {
+ /*
+ * For XA connections, use java:/JmsXA which should return a wrapped
+ * XAConnectionFactory instance, which will be the base for the
+ * XAQueueConnectionFactory or XATopicConnectionFactory.
+ */
+
+ try
+ {
+ if (TransactionStrategy.getTransactionStrategy(true).getTransaction() != null)
+ {
+ if ("XAConnectionFactory".equals(connectionFactoryString))
+ connectionFactoryString = "java:/JmsXA";
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ logger.warn("Could not determine transaction strategy!", ex);
+ }
+
factoryConnection = jndiContext.lookup(connectionFactoryString);
} catch (NamingException ne) {
logger.info("Received NamingException, refreshing context.");
@@ -153,12 +174,29 @@
jmsConnection = useJMSSecurity ? factory.createTopicConnection(username,password): factory.createTopicConnection();
}
- jmsConnection.setExceptionListener(new ExceptionListener() {
- public void onException(JMSException arg0)
+ TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+
+ /*
+ * It is illegal to set ExceptionListeners if we are within a transaction.
+ */
+
+ try
+ {
+ if ((txStrategy != null) && (txStrategy.getTransaction() == null))
{
- removeSessionPool() ;
+ jmsConnection.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException arg0)
+ {
+ removeSessionPool() ;
+ }
+ }) ;
}
- }) ;
+ }
+ catch (TransactionStrategyException ex)
+ {
+ throw new ConnectionException(ex);
+ }
+
jmsConnection.start();
} finally {
NamingContextPool.releaseNamingContext(jndiContext) ;
@@ -168,7 +206,7 @@
//Create a new Session
ArrayList<Session> freeSessions = freeSessionsMap.get( acknowledgeMode );
-
+
if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
QueueSession session = ((QueueConnection)jmsConnection).createQueueSession(transacted,acknowledgeMode);
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java 2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java 2008-01-31 12:49:43 UTC (rev 18218)
@@ -94,6 +94,14 @@
return getPool( enviroment, connectionFactory, destinationType, username, password, false );
}
+
+ public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
+ final String username, final String password, final boolean transacted)
+ throws ConnectionException
+ {
+ return getPool(enviroment, connectionFactory, destinationType, username, password, transacted, false);
+ }
+
/**
* Returns the pool given the identifiers for the JMS provider.
*
@@ -107,16 +115,31 @@
* @throws ConnectionException
*/
public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
- final String username, final String password, final boolean transacted)
+ final String username, final String password, final boolean transacted, final boolean ignore)
throws ConnectionException
{
Map<String,String> poolKey = createPoolKey(enviroment, connectionFactory, destinationType, username, password, transacted);
final Map<Map<String, String>, JmsConnectionPool> poolMap = getMap() ;
- if (poolMap.containsKey(poolKey)) {
+
+ if (poolMap.containsKey(poolKey) && !ignore) {
return poolMap.get(poolKey);
} else {
JmsConnectionPool pool = new JmsConnectionPool(poolKey);
- poolMap.put(poolKey, pool);
+
+ /*
+ * Horrible, but means we don't have to rewrite the pool code at this stage!
+ */
+
+ if (!ignore)
+ {
+ /*
+ * Put it into the pool unless this is a one-shot connection, e.g., for
+ * use in a global transaction.
+ */
+
+ poolMap.put(poolKey, pool);
+ }
+
return pool;
}
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java 2008-01-31 10:13:58 UTC (rev 18217)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/ListenerManagerJDBCUnitTest.java 2008-01-31 12:49:43 UTC (rev 18218)
@@ -38,7 +38,6 @@
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycle;
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleController;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.body.content.BytesBody;
import org.jboss.soa.esb.message.format.MessageFactory;
public class ListenerManagerJDBCUnitTest extends ListenerManagerFileUnitTest
@@ -54,7 +53,6 @@
clearMessages() ;
-
try
{
Statement stmt = getDbConnection().createStatement();
More information about the jboss-svn-commits
mailing list