[jboss-svn-commits] JBL Code SVN: r19172 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling and 12 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Mar 21 08:27:48 EDT 2008
Author: kevin.conner at jboss.com
Date: 2008-03-21 08:27:47 -0400 (Fri, 21 Mar 2008)
New Revision: 19172
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
Removed:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/xa/
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
Log:
Fix JMS/TX integration: JBESB-1618
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -32,11 +32,8 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
-import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.xml.parsers.ParserConfigurationException;
@@ -46,13 +43,10 @@
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
-import org.jboss.internal.soa.esb.rosetta.pooling.xa.XaJmsConnectionPool;
-import org.jboss.internal.soa.esb.rosetta.pooling.xa.XaJmsConnectionPoolContainer;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -125,57 +119,46 @@
}
} // ________________________________
- private void closeSession() {
- synchronized(this) {
- if (jmsSession != null) {
- try {
- getConnectionPool().closeSession(jmsSession);
- } catch (ConnectionException e) {
- _logger.error("Unable to get Connection Poll for closing of JMS Session.", e);
- } finally {
- jmsSession = null;
- }
+ private synchronized void closeSession() {
+ if (jmsSession != null) {
+ try {
+ getConnectionPool().closeSession(jmsSession);
+ } catch (ConnectionException e) {
+ _logger.error("Unable to get Connection Pool for closing of JMS Session.", e);
+ } finally {
+ jmsSession = null;
}
}
}
+
+ private synchronized void releaseSession() {
+ if (jmsSession != null) {
+ try {
+ getConnectionPool().releaseSession(jmsSession);
+ } catch (ConnectionException e) {
+ _logger.error("Unable to get Connection Pool for releasing of JMS Session.", e);
+ } finally {
+ jmsSession = null;
+ }
+ }
+ }
public Session getJmsSession() throws CourierException {
return getJmsSession(Session.AUTO_ACKNOWLEDGE);
}
- public Session getJmsSession(final int acknowledgeMode) throws CourierException {
- reset();
-
+ public synchronized Session getJmsSession(final int acknowledgeMode) throws CourierException {
if(jmsSession == null) {
- synchronized(this) {
- checkTransaction();
-
- if(jmsSession == null) {
- String sType;
-
- try {
- sType = _epr.getDestinationType();
- } catch (URISyntaxException e) {
- throw new CourierException("EPR.getDestinationType failed.", e);
- }
-
- try {
- if (JMSEpr.QUEUE_TYPE.equals(sType)) {
- jmsSession = getConnectionPool().getQueueSession(acknowledgeMode);
- } else {
- jmsSession = getConnectionPool().getTopicSession(acknowledgeMode);
- }
- } catch (NamingException e) {
- throw new CourierException("Failed to get JMS Session from pool.", e);
- } catch (JMSException e) {
- throw new CourierException("Failed to get JMS Session from pool.", e);
- } catch (ConnectionException e) {
- throw new CourierException("Failed to get JMS Session from pool.", e);
- }
- }
+ try {
+ jmsSession = getConnectionPool().getSession(acknowledgeMode);
+ } catch (NamingException e) {
+ throw new CourierException("Failed to get JMS Session from pool.", e);
+ } catch (JMSException e) {
+ throw new CourierException("Failed to get JMS Session from pool.", e);
+ } catch (ConnectionException e) {
+ throw new CourierException("Failed to get JMS Session from pool.", e);
}
}
-
return jmsSession;
}
@@ -194,11 +177,13 @@
return false;
}
- if (_messageProducer == null) {
- try {
- createMessageProducer();
- } catch (final NamingContextException nce) {
- throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+ synchronized(this) {
+ if (_messageProducer == null) {
+ try {
+ createMessageProducer();
+ } catch (final NamingContextException nce) {
+ throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+ }
}
}
@@ -239,66 +224,58 @@
if (null == message) {
return false;
}
-
- reset();
-
- if (_messageProducer == null) {
- try {
- createMessageProducer();
- } catch (final NamingContextException nce) {
- throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+ synchronized(this) {
+ if (_messageProducer == null) {
+ try {
+ createMessageProducer();
+ } catch (final NamingContextException nce) {
+ throw new CourierException("Unexpected exception attempting to access naming context pool", nce) ;
+ }
}
}
- while (null != _messageProducer) {
- try {
- for (KeyValuePair kvp : _messageProperties) {
- String key = kvp.getKey();
- if(message.getStringProperty(key) == null) {
- message.setStringProperty(key, kvp.getValue());
+ synchronized(this) {
+ while (_messageProducer != null) {
+ try {
+ for (KeyValuePair kvp : _messageProperties) {
+ String key = kvp.getKey();
+ if(message.getStringProperty(key) == null) {
+ message.setStringProperty(key, kvp.getValue());
+ }
}
+
+ _messageProducer.send(message);
+
+ return true;
}
-
- checkTransaction();
-
- sendMessage(message);
- if ( jmsSession.getTransacted() && !transactional )
- jmsSession.commit();
-
- return true;
+ catch (JMSException e) {
+ if (!jmsConnectRetry(e))
+ throw new CourierException("Caught exception during delivery and could not reconnect! ",e);
+ }
+ catch (Exception e) {
+ throw new CourierException(e);
+ }
}
- catch (JMSException e) {
- jmsConnectRetry(e);
- }
- catch (Exception e) {
- throw new CourierException(e);
- }
}
return false;
} // ________________________________
- /**
- * send/publish a javax.jms.ObjectMessage (that will contain the serialized
- * ESB Message)
- *
- * @param jmsMessage
- */
- private void sendMessage(javax.jms.Message jmsMessage) throws JMSException, URISyntaxException {
- String sType = _epr.getDestinationType();
- if (JMSEpr.TOPIC_TYPE.equals(sType)) {
- ((TopicPublisher) _messageProducer).publish(jmsMessage);
- } else {
- _messageProducer.send(jmsMessage);
- }
-
- } // ________________________________
-
- private void jmsConnectRetry(Exception exc) {
+ private boolean jmsConnectRetry(Exception exc) {
_logger.debug("JMS error. Attempting JMS reconnect.", exc);
synchronized(this) {
- cleanup();
+ try {
+ if (jmsSession.getTransacted()) {
+ jmsSession.rollback() ;
+ return false ;
+ } else {
+ cleanup() ;
+ }
+ } catch (final JMSException jmse) {
+ releaseSession() ;
+ return false ;
+ }
final int maxRetry = 5; // TODO Magic number here!!!
for (int i1 = 0; i1 < maxRetry; i1++) {
@@ -320,195 +297,104 @@
}
} else {
_logger.debug("Failed to reconnect to JMS", e);
+
+ return false ;
}
}
}
}
+
+ return true ;
} // ________________________________
private void createMessageProducer() throws CourierException, NamingContextException {
Context oJndiCtx = null;
-
- reset();
-
- checkTransaction();
- if (_messageProducer == null) {
- synchronized(this) {
- if (_messageProducer == null) {
- try {
- oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
+ synchronized(this) {
+ if (_messageProducer == null) {
+ try {
+ oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
- String sType = _epr.getDestinationType();
- if (JMSEpr.QUEUE_TYPE.equals(sType)) {
- QueueSession qSess = (QueueSession) getJmsSession(_epr.getAcknowledgeMode());
- javax.jms.Queue queue = null;
+ String sType = _epr.getDestinationType();
+ if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+ Session qSess = getJmsSession(_epr.getAcknowledgeMode());
+ javax.jms.Queue queue = null;
+ try {
+ queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
+ .getDestinationName());
+ } catch (NamingException ne) {
try {
+ oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
.getDestinationName());
- } catch (NamingException ne) {
- try {
- oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
- queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
- .getDestinationName());
- } catch (NamingException nex) {
- //ActiveMQ
- queue = qSess.createQueue(_epr.getDestinationName());
- }
+ } catch (NamingException nex) {
+ //ActiveMQ
+ queue = qSess.createQueue(_epr.getDestinationName());
}
- _messageProducer = qSess.createSender(queue);
- } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
- TopicSession tSess = (TopicSession) getJmsSession(_epr.getAcknowledgeMode());
- Topic topic = null;
- try {
- topic = (Topic) oJndiCtx.lookup(_epr
- .getDestinationName());
- }
- catch (NamingException ne) {
- topic = tSess.createTopic(_epr.getDestinationName());
- }
- _messageProducer = tSess.createPublisher(topic);
- } else {
- throw new CourierException("Unknown destination type");
}
- _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
- if ( _logger.isDebugEnabled() )
- _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
+ _messageProducer = qSess.createProducer(queue);
+ } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+ Session tSess = getJmsSession(_epr.getAcknowledgeMode());
+ Topic topic = null;
+ try {
+ topic = (Topic) oJndiCtx.lookup(_epr
+ .getDestinationName());
+ }
+ catch (NamingException ne) {
+ topic = tSess.createTopic(_epr.getDestinationName());
+ }
+ _messageProducer = tSess.createProducer(topic);
+ } else {
+ throw new CourierException("Unknown destination type");
}
- catch (JMSException ex) {
- _logger.debug("Error from JMS system.", ex);
+ _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
+ if ( _logger.isDebugEnabled() )
+ _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
+ }
+ catch (JMSException ex) {
+ _logger.debug("Error from JMS system.", ex);
- throw new CourierException(ex);
+ throw new CourierException(ex);
+ }
+ catch (URISyntaxException ex) {
+ throw new CourierException(ex);
+ } finally {
+ if (oJndiCtx != null) {
+ NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
- catch (URISyntaxException ex) {
- throw new CourierException(ex);
- } finally {
- if (oJndiCtx != null) {
- NamingContextPool.releaseNamingContext(oJndiCtx) ;
- }
- }
}
}
}
} // ________________________________
private JmsConnectionPool getConnectionPool() throws ConnectionException {
- reset();
-
- if (jmsConnectionPool == null) {
- synchronized(this) {
- try
- {
- checkTransaction();
- }
- catch (CourierException ex)
- {
- throw new ConnectionException(ex);
- }
-
- if(jmsConnectionPool == null) {
- String sFactoryClass;
- String sType;
- Properties properties;
- String username;
- String password;
- boolean transacted;
+ synchronized(this) {
+ if(jmsConnectionPool == null) {
+ String sFactoryClass;
+ Properties properties;
+ String username;
+ String password;
- try {
- sFactoryClass = _epr.getConnectionFactory();
- sType = _epr.getDestinationType();
- properties = _epr.getJndiEnvironment();
- username = _epr.getJMSSecurityPrincipal();
- password = _epr.getJMSSecurityCredential();
- transacted = _epr.getTransacted();
- } catch (URISyntaxException e) {
- throw new ConnectionException("Unexpected exception while getting JMS connection pool.", e);
- }
+ try {
+ sFactoryClass = _epr.getConnectionFactory();
+ properties = _epr.getJndiEnvironment();
+ username = _epr.getJMSSecurityPrincipal();
+ password = _epr.getJMSSecurityCredential();
+ } catch (URISyntaxException e) {
+ throw new ConnectionException("Unexpected exception while getting JMS connection pool.", e);
+ }
- if (Util.isNullString(sFactoryClass)) {
- sFactoryClass = "ConnectionFactory";
- }
-
- /*
- * Needs to be a one-shot instance if transactional.
- */
-
- Object tx = null;
-
- try
- {
- TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
-
- tx = txS.getTransaction();
- }
- catch (TransactionStrategyException ex)
- {
- _logger.warn("Problem getting transaction strategy: ", ex);
-
- throw new ConnectionException(ex);
- }
-
- if (tx == null)
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, transacted);
- else
- jmsConnectionPool = XaJmsConnectionPoolContainer.getPool(properties, sFactoryClass, sType, username, password, true, tx); // force transacted to be true!
+ if (Util.isNullString(sFactoryClass)) {
+ sFactoryClass = "ConnectionFactory";
}
+
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(properties, sFactoryClass, username, password);
}
- }
- return jmsConnectionPool;
+ return jmsConnectionPool;
+ }
}
-
- private void reset ()
- {
- /*
- * Are we in a global transaction?
- */
-
- if (jmsConnectionPool instanceof XaJmsConnectionPool)
- {
- /*
- * If the global transaction has terminated then this pool instance has been
- * closed and we need to get another one.
- */
-
- if (!((XaJmsConnectionPool) jmsConnectionPool).active())
- {
- jmsConnectionPool = null;
- _messageProducer = null;
- _messageConsumer = null;
- jmsSession = null;
- }
- }
- }
- private void checkTransaction () throws CourierException
- {
- try
- {
- TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
- Object txHandle = txStrategy.getTransaction();
- boolean isActive = txStrategy.isActive();
-
- transactional = (txHandle != null);
-
- /*
- * Make sure the current transaction is still active! If we
- * have previously slept, then the timeout may be longer than that
- * associated with the transaction.
- */
-
- if (transactional && !isActive)
- {
- throw new CourierException("Associated transaction is no longer active!");
- }
- }
- catch (TransactionStrategyException ex)
- {
- throw new CourierException(ex);
- }
- }
-
public Message pickup(long millis) throws CourierException, CourierTimeoutException {
javax.jms.Message jmsMessage = pickupPayload(millis);
@@ -520,9 +406,10 @@
throw new CourierException("This is an outgoing-only Courier");
if (millis < 1)
throw new IllegalArgumentException("Timeout millis must be > 0");
- if (null == _messageConsumer) try {
- createMessageConsumer();
- }
+ synchronized(this) {
+ if (null == _messageConsumer) try {
+ createMessageConsumer();
+ }
catch (Exception e) {
try {
Thread.sleep(1000); // TODO magic number
@@ -531,21 +418,23 @@
}
throw new CourierException("Unable to create Message Consumer", e);
}
+ }
javax.jms.Message jmsMessage = null;
- while (null != _messageConsumer) {
- checkTransaction();
-
- try {
- jmsMessage = _messageConsumer.receive(millis);
- break;
+ synchronized(this) {
+ while (null != _messageConsumer) {
+ try {
+ jmsMessage = _messageConsumer.receive(millis);
+ break;
+ }
+ catch (JMSException e) {
+ if (!jmsConnectRetry(e))
+ throw new CourierException("Caught exception during receive and could not reconnect! ",e);
+ }
+ catch (Exception e) {
+ throw new CourierException(e);
+ }
}
- catch (JMSException e) {
- jmsConnectRetry(e);
- }
- catch (Exception e) {
- throw new CourierException(e);
- }
}
return jmsMessage;
} // ________________________________
@@ -594,68 +483,64 @@
private void createMessageConsumer() throws CourierException, ConfigurationException, MalformedEPRException, NamingContextException {
Context oJndiCtx = null;
- reset();
-
- if (_messageConsumer == null) {
- synchronized(this) {
- if (_messageConsumer == null) {
- boolean success = false;
- try {
- Properties environment = _epr.getJndiEnvironment();
- oJndiCtx = NamingContextPool.getNamingContext(environment);
- try
- {
- String sType = _epr.getDestinationType();
- if (JMSEpr.QUEUE_TYPE.equals(sType)) {
- QueueSession qSess = (QueueSession) getJmsSession(_epr.getAcknowledgeMode());
- javax.jms.Queue queue = null;
+ synchronized(this) {
+ if (_messageConsumer == null) {
+ boolean success = false;
+ try {
+ Properties environment = _epr.getJndiEnvironment();
+ oJndiCtx = NamingContextPool.getNamingContext(environment);
+ try
+ {
+ String sType = _epr.getDestinationType();
+ if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+ Session qSess = getJmsSession(_epr.getAcknowledgeMode());
+ javax.jms.Queue queue = null;
+ try {
+ queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
+ .getDestinationName());
+ } catch (NamingException ne) {
try {
+ oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
.getDestinationName());
- } catch (NamingException ne) {
- try {
- oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
- queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
- .getDestinationName());
- } catch (NamingException nex) {
- //ActiveMQ
- queue = qSess.createQueue(_epr.getDestinationName());
- }
- }
- _messageConsumer = qSess.createReceiver(queue, _epr.getMessageSelector());
- } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
- TopicSession tSess = (TopicSession) getJmsSession(_epr.getAcknowledgeMode());
- Topic topic = null;
- try {
- topic = (Topic) oJndiCtx.lookup(_epr.getDestinationName());
} catch (NamingException nex) {
//ActiveMQ
- topic = tSess.createTopic(_epr.getDestinationName());
+ queue = qSess.createQueue(_epr.getDestinationName());
}
-
- _messageConsumer = tSess.createConsumer(topic, _epr
- .getMessageSelector());
- } else {
- throw new CourierException("Unknown destination type");
}
- success = true;
- } finally {
- NamingContextPool.releaseNamingContext(oJndiCtx) ;
+ _messageConsumer = qSess.createConsumer(queue, _epr.getMessageSelector());
+ } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+ Session tSess = getJmsSession(_epr.getAcknowledgeMode());
+ Topic topic = null;
+ try {
+ topic = (Topic) oJndiCtx.lookup(_epr
+ .getDestinationName());
+ }
+ catch (NamingException ne) {
+ topic = tSess.createTopic(_epr.getDestinationName());
+ }
+ _messageConsumer = tSess.createConsumer(topic, _epr
+ .getMessageSelector());
+ } else {
+ throw new CourierException("Unknown destination type");
}
+ success = true;
+ } finally {
+ NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
- catch (JMSException ex) {
- _logger.debug("Error from JMS system.", ex);
+ }
+ catch (JMSException ex) {
+ _logger.debug("Error from JMS system.", ex);
- throw new CourierException(ex);
+ throw new CourierException(ex);
+ }
+ catch (URISyntaxException ex) {
+ throw new MalformedEPRException(ex);
+ }
+ finally {
+ if (!success) {
+ closeSession();
}
- catch (URISyntaxException ex) {
- throw new MalformedEPRException(ex);
- }
- finally {
- if (!success) {
- closeSession();
- }
- }
}
}
}
@@ -671,17 +556,15 @@
protected String _messageSelector;
- protected volatile Session jmsSession;
+ protected JmsSession jmsSession;
- protected volatile MessageProducer _messageProducer;
+ protected MessageProducer _messageProducer;
- protected volatile MessageConsumer _messageConsumer;
+ protected MessageConsumer _messageConsumer;
protected List<KeyValuePair> _messageProperties;
- protected volatile JmsConnectionPool jmsConnectionPool;
-
- private boolean transactional = false;
+ protected JmsConnectionPool jmsConnectionPool;
/**
* Strategy for setting JMSProperties
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -53,13 +53,6 @@
public class SqlTableCourier implements PickUpOnlyCourier, DeliverOnlyCourier
{
/**
- * disable default constructor
- */
- private SqlTableCourier()
- {
- }
-
- /**
* package protected constructor - Objects of Courier should only be
* instantiated by the Factory
*
@@ -149,7 +142,7 @@
try
{
- TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
@@ -237,7 +230,7 @@
{
try
{
- TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -28,15 +28,12 @@
import java.util.Properties;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueSession;
import javax.jms.Session;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -74,22 +71,40 @@
private int SLEEP_TIME = DEFAULT_SLEEP;
/** Number of free sessions in the pool that can be given out. Indexed by session key */
- private Map<Integer,ArrayList<Session>> freeSessionsMap = new HashMap<Integer,ArrayList<Session>>();
+ private Map<Integer,ArrayList<JmsSession>> freeSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
/** Number of session that are currently in use. Indexed by session key mode */
- private Map<Integer,ArrayList<Session>> inUseSessionsMap = new HashMap<Integer,ArrayList<Session>>();
+ private Map<Integer,ArrayList<JmsSession>> inUseSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
/** Reference to a Queue or Topic Connection, we only need one per pool */
- protected Connection jmsConnection = null;
+ protected Connection jmsConnection ;
/** The Indentifier of the pool */
private Map<String, String> poolKey;
-
+
+ /**
+ * Mapping from transactions to sessions.
+ */
+ private Map<Object, JmsXASession> transactionsToSessions = new HashMap<Object, JmsXASession>() ;
+ /**
+ * Mapping from sessions to transactions.
+ */
+ private Map<JmsXASession, Object> sessionsToTransactions = new HashMap<JmsXASession, Object>() ;
+
/** Logger */
private Logger logger = Logger.getLogger(this.getClass());
/**
+ * The flag representing XA aware connections.
+ */
+ private boolean isXAAware ;
+ /**
+ * Flag signifying that the pool has been terminated.
+ */
+ private boolean terminated ;
+
+ /**
* Contructor of the pool.
*
*/
@@ -105,81 +120,35 @@
MAX_SESSIONS = poolSize;
SLEEP_TIME = sleepTime;
- freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<Session>() );
- freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<Session>() );
- freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<Session>() );
+ freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
- inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<Session>() );
- inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<Session>() );
- inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<Session>() );
+ inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
}
/**
* This is where we create the sessions.
*
* @param poolKey
- * @throws NamingException
+ * @param transacted
* @throws JMSException
- * @throws ConnectionException
*/
- private synchronized void addAnotherSession(Map<String, String> poolKey, final int acknowledgeMode)
- throws NamingException, JMSException, ConnectionException, NamingContextException
+ private synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
+ throws JMSException
{
- String destinationType = poolKey.get(JMSEpr.DESTINATION_TYPE_TAG);
-
- //Setup a connection if we don't have one
- if (jmsConnection==null) {
- JmsConnectionPoolContainer.addToPool(poolKey, this);
- logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
- Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
- Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
- try {
- String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
- Object factoryConnection=null;
-
- try
- {
- factoryConnection = jndiContext.lookup(overrideName(connectionFactoryString));
- } catch (NamingException ne) {
- logger.info("Received NamingException, refreshing context.");
- jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
- factoryConnection = jndiContext.lookup(connectionFactoryString);
- }
- final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
- final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
- boolean useJMSSecurity = (username != null && password != null);
- logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
-
- if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
- QueueConnectionFactory factory = (QueueConnectionFactory) factoryConnection;
- jmsConnection = useJMSSecurity ? factory.createQueueConnection(username,password): factory.createQueueConnection();
- } else {
- TopicConnectionFactory factory = (TopicConnectionFactory) factoryConnection;
- jmsConnection = useJMSSecurity ? factory.createTopicConnection(username,password): factory.createTopicConnection();
- }
-
- addExceptionListener();
-
- jmsConnection.start();
- } finally {
- NamingContextPool.releaseNamingContext(jndiContext) ;
- }
- }
- final boolean transacted = Boolean.valueOf(poolKey.get(JMSEpr.TRANSACTED_TAG));
-
//Create a new Session
- ArrayList<Session> freeSessions = freeSessionsMap.get( acknowledgeMode );
-
- if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
- QueueSession session = ((QueueConnection)jmsConnection).createQueueSession(transacted,acknowledgeMode);
-
- freeSessions.add(session);
- } else if (JMSEpr.TOPIC_TYPE.equals(destinationType)) {
- TopicSession session = ((TopicConnection) jmsConnection).createTopicSession(transacted,acknowledgeMode);
- freeSessions.add(session);
+ ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
+ // For now we only support JTA transacted sessions
+ final JmsSession session ;
+ if (transacted) {
+ session = new JmsXASession(this, ((XAConnection)jmsConnection).createXASession());
} else {
- throw new ConnectionException("Unknown destination type");
+ session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
}
+ freeSessions.add(session);
logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
}
@@ -189,14 +158,35 @@
* @return Connection to be used
* @throws ConnectionException
*/
- public synchronized Session getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
+ public synchronized JmsSession getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
{
+ try {
+ initConnection() ;
+ } catch (final NamingContextException nce) {
+ throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
+ }
+ final boolean transacted ;
+ try {
+ transacted = (isXAAware && TransactionStrategy.getTransactionStrategy(true).isActive()) ;
+ } catch (final TransactionStrategyException tse) {
+ throw new ConnectionException("Failed to determine current transaction context", tse) ;
+ }
+
+ if (transacted)
+ {
+ final JmsXASession currentSession = getXASession() ;
+ if (currentSession != null)
+ {
+ return currentSession ;
+ }
+ }
+ final int mode = (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode) ;
+
final long end = System.currentTimeMillis() + (SLEEP_TIME * 1000) ;
boolean emitExpiry = logger.isDebugEnabled() ;
for(;;) {
-
- ArrayList<Session> freeSessions = freeSessionsMap.get(acknowledgeMode );
- ArrayList<Session> inUseSessions = inUseSessionsMap.get(acknowledgeMode);
+ ArrayList<JmsSession> freeSessions = freeSessionsMap.get(mode );
+ ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
if (freeSessions.size() > 0)
{
if (logger.isDebugEnabled()) {
@@ -204,16 +194,11 @@
+ ", maxsize=" + MAX_SESSIONS
+ ", number of pools=" + JmsConnectionPoolContainer.getNumberOfPools());
}
- final Session session = freeSessions.remove(freeSessions.size()-1);
+ final JmsSession session = freeSessions.remove(freeSessions.size()-1);
inUseSessions.add(session);
return session ;
} else if (inUseSessions.size()<MAX_SESSIONS) {
- try {
- addAnotherSession(poolKey,acknowledgeMode);
- } catch (final NamingContextException nce) {
- throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
- }
-
+ addAnotherSession(poolKey, transacted, mode);
continue ;
} else {
if (emitExpiry)
@@ -239,74 +224,106 @@
}
}
/**
- * This method can be called whenever a Queue Session is needed from the pool.
+ * This method can be called whenever a Session is needed from the pool.
* @return
* @throws NamingException
* @throws JMSException
* @throws ConnectionException
*/
- public QueueSession getQueueSession() throws NamingException, JMSException, ConnectionException
+ public JmsSession getSession() throws NamingException, JMSException, ConnectionException
{
- return (QueueSession) getQueueSession(Session.AUTO_ACKNOWLEDGE);
+ return getSession(Session.AUTO_ACKNOWLEDGE);
}
- public QueueSession getQueueSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
- {
- return (QueueSession) getSession(acknowledgeMode);
+ /**
+ * This method closes an open connection and returns the connection to the pool.
+ * @param session The connection to be returned to the pool.
+ * @throws SQLException
+ * @deprecated
+ */
+ public void closeSession(Session session){
+ if (session instanceof JmsSession) {
+ closeSession((JmsSession)session) ;
+ } else {
+ logger.error("Invalid JMS Session type in closeSession: " + session);
+ }
}
/**
- * This method can be called whenever a Topic Session is needed from the pool.
- * @return
- * @throws NamingException
- * @throws JMSException
- * @throws ConnectionException
+ * This method closes an open connection and returns the connection to the pool.
+ * @param session The connection to be returned to the pool.
+ * @throws SQLException
*/
- public TopicSession getTopicSession() throws NamingException, JMSException, ConnectionException
+ public void closeSession(JmsSession session){
+ session.handleCloseSession(this) ;
+ }
+
+ /**
+ * Handle the real work of closing the connection.
+ * @param session The session to close.
+ */
+ synchronized void handleCloseSession(final JmsSession session)
{
- return (TopicSession) getTopicSession(Session.AUTO_ACKNOWLEDGE);
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.add(session) ;
+ }
+ handleReleaseSession(session) ;
}
- public TopicSession getTopicSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
+ /**
+ * Handle the real work of releasing the connection.
+ * @param session The session to release.
+ */
+ synchronized void handleReleaseSession(final JmsSession session)
{
- return (TopicSession) getSession(acknowledgeMode);
+ session.releaseResources() ;
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (inUseSessionsMap == null ? null : inUseSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.remove(session) ;
+ }
+ notifyAll() ;
}
-
+
/**
- * This method closes an open connection and returns the connection to the pool.
- * @param sessionToClose The connection to be returned to the pool.
+ * This method closes an open session without returning it to the pool.
+ * @param session The session to be returned to the pool.
* @throws SQLException
*/
- public synchronized void closeSession(Session sessionToClose){
- try
- {
- ArrayList<Session> sessions = freeSessionsMap.get(sessionToClose.getAcknowledgeMode());
- if ( sessions != null )
- sessions.add(sessionToClose);
-
- releaseSession(sessionToClose) ;
- } catch (JMSException e)
- {
- logger.error("JMSException while calling getAcknowledgeMode", e);
- }
+ public synchronized void releaseSession(final JmsSession session) {
+ session.handleReleaseSession(this) ;
}
/**
* This method closes an open session without returning it to the pool.
- * @param sessionToClose The session to be returned to the pool.
+ * @param session The session to be returned to the pool.
* @throws SQLException
+ * @deprecated
*/
- public synchronized void releaseSession(final Session sessionToClose) {
- try
- {
- ArrayList<Session> inUseSessions = inUseSessionsMap.get(sessionToClose.getAcknowledgeMode());
- if ( inUseSessions != null )
- inUseSessions.remove(sessionToClose);
- notifyAll() ;
- } catch (JMSException e)
- {
- logger.error("JMSException while calling getAcknowledgeMode", e);
- }
+ public synchronized void releaseSession(final Session session) {
+ if (session instanceof JmsSession) {
+ releaseSession((JmsSession)session) ;
+ } else {
+ logger.error("Invalid JMS Session type in releaseSession: " + session);
+ }
}
/**
@@ -315,20 +332,18 @@
*/
public synchronized void removeSessionPool()
{
- freeSessionsMap.get(Session.AUTO_ACKNOWLEDGE).clear() ;
- freeSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).clear() ;
- freeSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).clear() ;
+ freeSessionsMap = null ;
+ inUseSessionsMap = null ;
+ transactionsToSessions = null ;
+ sessionsToTransactions = null ;
- inUseSessionsMap.get(Session.AUTO_ACKNOWLEDGE).clear() ;
- inUseSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).clear() ;
- inUseSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).clear() ;
-
logger.debug("Emptied the session pool now closing the connection to the factory.");
if (jmsConnection!=null) {
try {
jmsConnection.close();
} catch (final Exception ex) {} // ignore
jmsConnection=null;
+ terminated = true ;
}
JmsConnectionPoolContainer.removePool(poolKey);
}
@@ -338,13 +353,10 @@
* @return the session pool size
*/
public int getSessionsInPool() {
- int nrOfSessions = freeSessionsMap.get(Session.AUTO_ACKNOWLEDGE).size();
- nrOfSessions += freeSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).size();
- nrOfSessions += freeSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).size();
- nrOfSessions += inUseSessionsMap.get(Session.AUTO_ACKNOWLEDGE).size();
- nrOfSessions += inUseSessionsMap.get(Session.CLIENT_ACKNOWLEDGE).size();
- nrOfSessions += inUseSessionsMap.get(Session.DUPS_OK_ACKNOWLEDGE).size();
- return nrOfSessions;
+ return getSessionsInPool(Session.AUTO_ACKNOWLEDGE) +
+ getSessionsInPool(Session.CLIENT_ACKNOWLEDGE) +
+ getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE) +
+ getSessionsInPool(Session.SESSION_TRANSACTED) ;
}
/**
@@ -353,8 +365,8 @@
* @param acknowledgeMode the acknowledge mode of sessions
* @return
*/
- public int getSessionsInPool(final int acknowledgeMode) {
- return freeSessionsMap.get(acknowledgeMode).size() + inUseSessionsMap.get(acknowledgeMode).size();
+ public synchronized int getSessionsInPool(final int acknowledgeMode) {
+ return getFreeSessionsInPool(acknowledgeMode) + getInUseSessionsInPool(acknowledgeMode) ;
}
/**
@@ -362,8 +374,10 @@
* @param acknowledgeMode the acknowledge mode of sessions
* @return int the number of in use sessions
*/
- public int getFreeSessionsInPool(final int acknowledgeMode) {
- return freeSessionsMap.get(acknowledgeMode).size();
+ public synchronized int getFreeSessionsInPool(final int acknowledgeMode) {
+ final ArrayList<JmsSession> freeSessionMap = (freeSessionsMap == null ? null : freeSessionsMap.get(acknowledgeMode)) ;
+ final int numFreeSessions = (freeSessionMap == null ? 0 : freeSessionMap.size()) ;
+ return numFreeSessions;
}
/**
@@ -372,25 +386,126 @@
* @param acknowledgeMode the acknowledge mode of sessions
* @return int the number of in use sessions
*/
- public int getInUseSessionsInPool(final int acknowledgeMode) {
- return inUseSessionsMap.get(acknowledgeMode).size();
+ public synchronized int getInUseSessionsInPool(final int acknowledgeMode) {
+ final ArrayList<JmsSession> inUseSessionMap = (inUseSessionsMap == null ? null : inUseSessionsMap.get(acknowledgeMode)) ;
+ final int numInUseSessions = (inUseSessionMap == null ? 0 : inUseSessionMap.size()) ;
+ return numInUseSessions;
}
- protected String overrideName (String name) throws ConnectionException
+ /**
+ * Initialise the connection.
+ * @throws ConnectionException If the pool has already been terminated.
+ * @throws NamingContextException for errors obtaining a naming context
+ * @throws NamingException for errors accessing a naming context
+ * @throws JMSException for errors creating the connection
+ */
+ private synchronized void initConnection()
+ throws ConnectionException, NamingContextException, NamingException, JMSException
{
- return name;
+ if (terminated)
+ {
+ throw new ConnectionException("Connection pool has been terminated") ;
+ }
+
+ if (jmsConnection==null) {
+ JmsConnectionPoolContainer.addToPool(poolKey, this);
+ logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
+ Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
+ Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
+ try {
+ String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
+ Object factoryConnection=null;
+
+ try
+ {
+ factoryConnection = jndiContext.lookup(connectionFactoryString);
+ } catch (NamingException ne) {
+ logger.info("Received NamingException, refreshing context.");
+ jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
+ factoryConnection = jndiContext.lookup(connectionFactoryString);
+ }
+ final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
+ final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
+ boolean useJMSSecurity = (username != null && password != null);
+ logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
+ if (factoryConnection instanceof XAConnectionFactory) {
+ final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
+ jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
+ isXAAware = true ;
+ freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+ } else if (factoryConnection instanceof ConnectionFactory) {
+ final ConnectionFactory factory = (ConnectionFactory)factoryConnection ;
+ jmsConnection = useJMSSecurity ? factory.createConnection(username,password): factory.createConnection();
+ }
+
+ jmsConnection.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException arg0)
+ {
+ removeSessionPool() ;
+ }
+ }) ;
+ jmsConnection.start();
+ } finally {
+ NamingContextPool.releaseNamingContext(jndiContext) ;
+ }
+ }
}
- protected void addExceptionListener () throws JMSException, ConnectionException
+ /**
+ * Get the current transaction.
+ * @return The transaction or null if none present.
+ * @throws ConnectionException if the transaction context cannot be obtained.
+ */
+ private Object getTransaction()
+ throws ConnectionException
{
- jmsConnection.setExceptionListener(new ExceptionListener() {
- public void onException(JMSException arg0)
- {
- removeSessionPool() ;
- }
- }) ;
+ try {
+ return TransactionStrategy.getTransactionStrategy(true).getTransaction() ;
+ } catch (final TransactionStrategyException tse) {
+ throw new ConnectionException("Failed to determine current transaction context", tse) ;
+ }
}
+ /**
+ * Get a JMS session associated with the current transaction.
+ * @return The JMS session or null if not associated.
+ * @throws ConnectionException For accessint the current transaction context
+ */
+ private synchronized JmsXASession getXASession()
+ throws ConnectionException
+ {
+ final Object tx = getTransaction() ;
+ return transactionsToSessions.get(tx) ;
+ }
+
+ /**
+ * Associate the JMS XA Session with the current transaction.
+ * @param session The XA session.
+ * @throws ConnectionException if there is no transaction active.
+ */
+ synchronized void associateTransaction(final JmsXASession session)
+ throws ConnectionException
+ {
+ final Object tx = getTransaction() ;
+ if (tx == null)
+ {
+ throw new ConnectionException("No active transaction") ;
+ }
+ transactionsToSessions.put(tx, session) ;
+ sessionsToTransactions.put(session, tx) ;
+ }
+
+ /**
+ * Disassociate the JMS XA Session from a transaction.
+ * @param session The XA session.
+ */
+ synchronized void disassociateTransaction(final JmsXASession session)
+ {
+ final Object tx = sessionsToTransactions.remove(session) ;
+ transactionsToSessions.remove(tx) ;
+ }
+
static
{
PropertyManager prop = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE);
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -70,50 +70,38 @@
{
return getPool(jmsEpr.getJndiEnvironment(),
jmsEpr.getConnectionFactory(),
- jmsEpr.getDestinationType(),
jmsEpr.getJMSSecurityPrincipal(),
- jmsEpr.getJMSSecurityCredential(),
- jmsEpr.getTransacted());
+ jmsEpr.getJMSSecurityCredential()) ;
}
/**
* Returns the pool given the identifiers for the JMS provider.
*
* @param enviroment - JNDI environment for which a JMSConnectionPool should be retreived
* @param connectionFactory - connectionfactory for which a JMSConnectionPool should be retreived
- * @param destinationType - destinationType(Queue or Topic) for which a JMSConnectionPool should be retreived
* @return <code>JmsConnectionPool</code>
* @throws ConnectionException
*/
- public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType)
+ public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory)
throws ConnectionException
{
- return getPool(enviroment, connectionFactory, destinationType, null, null, false);
+ return getPool(enviroment, connectionFactory, null, null);
}
- public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
- final String username, final String password)
- throws ConnectionException
- {
- return getPool( enviroment, connectionFactory, destinationType, username, password, false );
-
- }
/**
* Returns the pool given the identifiers for the JMS provider.
*
* @param enviroment - JNDI evironment for which a JMSConnectionPool should be retreived
* @param connectionFactory - connectionfactory for which a JMSConnectionPool should be retreived
- * @param destinationType - destinationType(Queue or Topic) for which a JMSConnectionPool should be retreived
* @param username - username that should be used to create the JMS Connection
* @param password - password that should be used to create the JMS Connection
- * @param transacted - should the JMS Session be transacted
* @return <code>JmsConnectionPool</code>
* @throws ConnectionException
*/
- public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory, String destinationType,
- final String username, final String password, final boolean transacted)
+ public static JmsConnectionPool getPool(Properties enviroment, String connectionFactory,
+ final String username, final String password)
throws ConnectionException
{
- Map<String,String> poolKey = createPoolKey(enviroment, connectionFactory, destinationType, username, password, transacted);
+ Map<String,String> poolKey = createPoolKey(enviroment, connectionFactory, username, password);
final Map<Map<String, String>, JmsConnectionPool> poolMap = getMap() ;
if (poolMap.containsKey(poolKey)) {
@@ -131,25 +119,23 @@
*
* @param environment - the JNDI environment parameters
* @param connectionFactory
- * @param destinationType
* @return
*/
- public static Map<String, String> createPoolKey(Properties environment, String connectionFactory, String destinationType)
+ public static Map<String, String> createPoolKey(Properties environment, String connectionFactory)
{
- return createPoolKey( environment, connectionFactory, destinationType, null, null, false );
+ return createPoolKey( environment, connectionFactory, null, null );
}
/**
* Creates a poolKey using the identifying parameters
*
* @param environment - the JNDI environment parameters
* @param connectionFactory
- * @param destinationType
* @param username the JMS username to be used. Used with {@link javax.jms.ConnectionFactory } createConnection
* @param password the JMS password to be used. Used with {@link javax.jms.ConnectionFactory } createConnection
* @return
*/
- public static Map<String, String> createPoolKey(Properties environment, String connectionFactory, String destinationType,
- final String username, final String password, final boolean transacted)
+ public static Map<String, String> createPoolKey(Properties environment, String connectionFactory,
+ final String username, final String password)
{
Map<String,String> poolKey = new HashMap<String,String>();
if (environment!=null) {
@@ -164,10 +150,7 @@
}
if (connectionFactory!=null) poolKey.put(JMSEpr.CONNECTION_FACTORY_TAG, connectionFactory);
- if (destinationType!=null) poolKey.put(JMSEpr.DESTINATION_TYPE_TAG, destinationType);
- poolKey.put(JMSEpr.TRANSACTED_TAG, String.valueOf(transacted));
-
return poolKey;
}
/**
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -0,0 +1,380 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.io.Serializable;
+import java.util.HashSet;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Wrapper for JMS session class, responsible for tracking resources and the pooling.
+ */
+public class JmsSession implements Session
+{
+ /**
+ * The session delegate.
+ */
+ private final Session session ;
+
+ /**
+ * The set of active queue browsers.
+ */
+ private HashSet<QueueBrowser> queueBrowserSet ;
+ /**
+ * The set of active message consumers.
+ */
+ private HashSet<MessageConsumer> messageConsumerSet ;
+ /**
+ * The set of active message producers.
+ */
+ private HashSet<MessageProducer> messageProducerSet ;
+
+ /**
+ * Create the session wrapper.
+ * @param session The session delegate.
+ * @param isJTA True if this tales part in a JTA transaction
+ * @throws JMSException
+ */
+ JmsSession(final Session session)
+ throws JMSException
+ {
+ this.session = session ;
+ }
+
+ public void close() throws JMSException
+ {
+ session.close();
+ }
+
+ public void commit() throws JMSException
+ {
+ session.commit();
+ }
+
+ public QueueBrowser createBrowser(Queue arg0, String arg1)
+ throws JMSException
+ {
+ return trackQueueBrowser(session.createBrowser(arg0, arg1));
+ }
+
+ public QueueBrowser createBrowser(Queue arg0) throws JMSException
+ {
+ return trackQueueBrowser(session.createBrowser(arg0));
+ }
+
+ public BytesMessage createBytesMessage() throws JMSException
+ {
+ associate() ;
+ return session.createBytesMessage();
+ }
+
+ public MessageConsumer createConsumer(Destination arg0, String arg1,
+ boolean arg2) throws JMSException
+ {
+ return trackMessageConsumer(session.createConsumer(arg0, arg1, arg2));
+ }
+
+ public MessageConsumer createConsumer(Destination arg0, String arg1)
+ throws JMSException
+ {
+ return trackMessageConsumer(session.createConsumer(arg0, arg1));
+ }
+
+ public MessageConsumer createConsumer(Destination arg0) throws JMSException
+ {
+ return trackMessageConsumer(session.createConsumer(arg0));
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1,
+ String arg2, boolean arg3) throws JMSException
+ {
+ return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1, arg2, arg3));
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1)
+ throws JMSException
+ {
+ return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1));
+ }
+
+ public MapMessage createMapMessage() throws JMSException
+ {
+ associate() ;
+ return session.createMapMessage();
+ }
+
+ public Message createMessage() throws JMSException
+ {
+ associate() ;
+ return session.createMessage();
+ }
+
+ public ObjectMessage createObjectMessage() throws JMSException
+ {
+ associate() ;
+ return session.createObjectMessage();
+ }
+
+ public ObjectMessage createObjectMessage(Serializable arg0)
+ throws JMSException
+ {
+ associate() ;
+ return session.createObjectMessage(arg0);
+ }
+
+ public MessageProducer createProducer(Destination arg0) throws JMSException
+ {
+ return trackMessageProducer(session.createProducer(arg0));
+ }
+
+ public Queue createQueue(String arg0) throws JMSException
+ {
+ associate() ;
+ return session.createQueue(arg0);
+ }
+
+ public StreamMessage createStreamMessage() throws JMSException
+ {
+ associate() ;
+ return session.createStreamMessage();
+ }
+
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ associate() ;
+ return session.createTemporaryQueue();
+ }
+
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ associate() ;
+ return session.createTemporaryTopic();
+ }
+
+ public TextMessage createTextMessage() throws JMSException
+ {
+ associate() ;
+ return session.createTextMessage();
+ }
+
+ public TextMessage createTextMessage(String arg0) throws JMSException
+ {
+ associate() ;
+ return session.createTextMessage(arg0);
+ }
+
+ public Topic createTopic(String arg0) throws JMSException
+ {
+ associate() ;
+ return session.createTopic(arg0);
+ }
+
+ public int getAcknowledgeMode() throws JMSException
+ {
+ associate() ;
+ return session.getAcknowledgeMode();
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ associate() ;
+ return session.getMessageListener();
+ }
+
+ public boolean getTransacted() throws JMSException
+ {
+ associate() ;
+ return session.getTransacted();
+ }
+
+ public void recover() throws JMSException
+ {
+ associate() ;
+ session.recover();
+ }
+
+ public void rollback() throws JMSException
+ {
+ session.rollback();
+ }
+
+ public void run()
+ {
+ session.run();
+ }
+
+ public void setMessageListener(MessageListener arg0) throws JMSException
+ {
+ associate() ;
+ session.setMessageListener(arg0);
+ }
+
+ public void unsubscribe(String arg0) throws JMSException
+ {
+ associate() ;
+ session.unsubscribe(arg0);
+ }
+
+ private synchronized QueueBrowser trackQueueBrowser(QueueBrowser queueBrowser)
+ throws JMSException
+ {
+ associate() ;
+ if (queueBrowserSet == null)
+ {
+ queueBrowserSet = new HashSet<QueueBrowser>() ;
+ }
+ final QueueBrowser result = getQueueBrowser(queueBrowser) ;
+ queueBrowserSet.add(result) ;
+ return result ;
+ }
+
+ private synchronized MessageConsumer trackMessageConsumer(MessageConsumer messageConsumer)
+ throws JMSException
+ {
+ associate() ;
+ if (messageConsumerSet == null)
+ {
+ messageConsumerSet = new HashSet<MessageConsumer>() ;
+ }
+ final MessageConsumer result = getMessageConsumer(messageConsumer) ;
+ messageConsumerSet.add(result) ;
+ return result ;
+ }
+
+ private synchronized TopicSubscriber trackTopicSubscriber(TopicSubscriber topicSubscriber)
+ throws JMSException
+ {
+ associate() ;
+ if (messageConsumerSet == null)
+ {
+ messageConsumerSet = new HashSet<MessageConsumer>() ;
+ }
+ final TopicSubscriber result = getTopicSubscriber(topicSubscriber) ;
+ messageConsumerSet.add(result) ;
+ return result ;
+ }
+
+ private synchronized MessageProducer trackMessageProducer(MessageProducer messageProducer)
+ throws JMSException
+ {
+ associate() ;
+ if (messageProducerSet == null)
+ {
+ messageProducerSet = new HashSet<MessageProducer>() ;
+ }
+ final MessageProducer result = getMessageProducer(messageProducer) ;
+ messageProducerSet.add(result) ;
+ return result ;
+ }
+
+ synchronized void releaseResources()
+ {
+ if (queueBrowserSet != null)
+ {
+ for(QueueBrowser queueBrowser: queueBrowserSet)
+ {
+ try {
+ queueBrowser.close() ;
+ } catch (final JMSException jmse) {} // ignore
+ }
+ queueBrowserSet = null ;
+ }
+ if (messageConsumerSet != null)
+ {
+ for(MessageConsumer messageConsumer: messageConsumerSet)
+ {
+ try {
+ messageConsumer.close() ;
+ } catch (final JMSException jmse) {} // ignore
+ }
+ messageConsumerSet = null ;
+ }
+ if (messageProducerSet != null)
+ {
+ for(MessageProducer messageProducer: messageProducerSet)
+ {
+ try {
+ messageProducer.close() ;
+ } catch (final JMSException jmse) {} // ignore
+ }
+ messageProducerSet = null ;
+ }
+ try
+ {
+ recover() ;
+ }
+ catch (final JMSException jmse) {} // ignore
+ }
+
+ protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
+ {
+ return queueBrowser ;
+ }
+
+ protected MessageConsumer getMessageConsumer(MessageConsumer messageConsumer)
+ {
+ return messageConsumer ;
+ }
+
+ protected TopicSubscriber getTopicSubscriber(TopicSubscriber topicSubscriber)
+ {
+ return topicSubscriber ;
+ }
+
+ protected MessageProducer getMessageProducer(MessageProducer messageProducer)
+ {
+ return messageProducer ;
+ }
+
+ protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+ {
+ jmsConnectionPool.handleCloseSession(this) ;
+ }
+
+ protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+ {
+ jmsConnectionPool.handleReleaseSession(this) ;
+ }
+
+ protected void associate()
+ throws JMSException
+ {
+ }
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -0,0 +1,230 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.TopicSubscriber;
+import javax.jms.XASession;
+import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
+
+/**
+ * Wrapper for JMS XA session class, responsible for tracking resources and the pooling.
+ */
+class JmsXASession extends JmsSession implements Synchronization
+{
+ /**
+ * The connection pool.
+ */
+ private final JmsConnectionPool pool ;
+
+ /**
+ * The session delegate.
+ */
+ private final XASession session ;
+
+ /**
+ * Flag indicating whether this session is associated with a transaction.
+ */
+ private boolean associated ;
+
+ /**
+ * Cleanup actions
+ */
+ private enum Cleanup { close, release }
+
+ /**
+ * The cleanup action for the synchronization.
+ */
+ private Cleanup cleanupAction = Cleanup.close ;
+
+ /**
+ * Create the session wrapper.
+ * @param pool The current connection pool
+ * @param session The session delegate.
+ * @param isJTA True if this tales part in a JTA transaction
+ * @throws JMSException
+ */
+ JmsXASession(final JmsConnectionPool pool, final XASession session)
+ throws JMSException
+ {
+ super(session) ;
+ this.pool = pool ;
+ this.session = session ;
+ }
+
+ @Override
+ public void commit() throws JMSException
+ {
+ // Handled by the transaction
+ }
+
+ @Override
+ public void rollback() throws JMSException
+ {
+ try
+ {
+ TransactionStrategy.getTransactionStrategy(true).rollbackOnly() ;
+ }
+ catch (final TransactionStrategyException tse)
+ {
+ final JMSException ex = new JMSException("Failed to rollback transaction") ;
+ ex.initCause(tse) ;
+ throw ex ;
+ }
+ }
+
+ @Override
+ protected MessageProducer getMessageProducer(MessageProducer messageProducer)
+ {
+ final InvocationHandler handler = new AssociationHandler(messageProducer) ;
+ return (MessageProducer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageProducer.class}, handler );
+ }
+
+ @Override
+ protected MessageConsumer getMessageConsumer(MessageConsumer messageConsumer)
+ {
+ final InvocationHandler handler = new AssociationHandler(messageConsumer) ;
+ return (MessageConsumer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageConsumer.class}, handler);
+ }
+
+ @Override
+ protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
+ {
+ final InvocationHandler handler = new AssociationHandler(queueBrowser) ;
+ return (QueueBrowser)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueBrowser.class}, handler);
+ }
+
+ @Override
+ protected TopicSubscriber getTopicSubscriber(TopicSubscriber topicSubscriber)
+ {
+ final InvocationHandler handler = new AssociationHandler(topicSubscriber) ;
+ return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
+ }
+
+ protected void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+ {
+ cleanupAction = Cleanup.close ;
+ }
+
+ protected void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
+ {
+ cleanupAction = Cleanup.release ;
+ }
+
+ protected synchronized void associate()
+ throws JMSException
+ {
+ if (!associated)
+ {
+ final XAResource resource = session.getXAResource() ;
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ try
+ {
+ transactionStrategy.registerSynchronization(this) ;
+ transactionStrategy.enlistResource(resource) ;
+ }
+ catch (final TransactionStrategyException tse)
+ {
+ final JMSException ex = new JMSException("Failed to initialise transaction resources") ;
+ ex.initCause(tse) ;
+ throw ex ;
+ }
+ try
+ {
+ pool.associateTransaction(this) ;
+ }
+ catch (final ConnectionException ce)
+ {
+ final JMSException ex = new JMSException("Failed to associate session with the current transaction") ;
+ ex.initCause(ce) ;
+ throw ex ;
+ }
+
+ associated = true ;
+ }
+ }
+
+ public void beforeCompletion()
+ {
+ }
+
+ public synchronized void afterCompletion(final int result)
+ {
+ pool.disassociateTransaction(this) ;
+ switch (cleanupAction)
+ {
+ case close:
+ pool.handleCloseSession(this) ;
+ break ;
+ case release:
+ pool.handleReleaseSession(this) ;
+ break ;
+ }
+ associated = false ;
+ }
+
+ /**
+ * Handler responsible for associating XA resources.
+ * @author kevin
+ */
+ private final class AssociationHandler implements InvocationHandler
+ {
+ /**
+ * The target instance.
+ */
+ private final Object target ;
+
+ /**
+ * Construct the handler using the specified target.
+ * @param target The target instance.
+ */
+ public AssociationHandler(final Object target)
+ {
+ this.target = target ;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ associate() ;
+ try
+ {
+ return method.invoke(target, args);
+ }
+ catch (final InvocationTargetException ite)
+ {
+ throw ite.getCause() ;
+ }
+ }
+ }
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -34,7 +34,6 @@
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
-import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
@@ -44,6 +43,7 @@
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.ActionProcessingException;
import org.jboss.soa.esb.addressing.EPR;
@@ -344,7 +344,7 @@
}
private static class JMSSendQueueSetup {
- Session jmsSession;
+ JmsSession jmsSession;
Destination jmsDestination;
MessageProducer jmsProducer;
String destinationName;
@@ -377,11 +377,11 @@
environment.setProperty(Context.URL_PKG_PREFIXES, jndiPkgPrefix);
Context oCtx = NamingContextPool.getNamingContext(environment);
try {
- pool = JmsConnectionPoolContainer.getPool(environment, connectionFactoryName, JMSEpr.QUEUE_TYPE);
+ pool = JmsConnectionPoolContainer.getPool(environment, connectionFactoryName);
this.destinationName = destinationName;
- jmsSession = pool.getQueueSession();
+ jmsSession = pool.getSession();
boolean clean = true ;
try {
try {
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -529,7 +529,6 @@
} catch (Throwable t) {
logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + targetEPR + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", t);
} finally {
- // TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
CourierUtil.cleanCourier(courier);
// put back the old To since we will have changed it.
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/JBossESBPropertyService.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -30,6 +30,7 @@
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.util.XMLHelper;
@@ -278,7 +279,25 @@
throw new TransactionStrategyException("Problem when registering synchronization: ", th);
}
}
+
/**
+ * Add a resource to the current transaction.
+ * @param resource
+ * @throws TransactionStrategyException
+ */
+ public void enlistResource(XAResource resource) throws TransactionStrategyException
+ {
+ try
+ {
+ tm.getTransaction().enlistResource(resource);
+ }
+ catch (final Throwable th)
+ {
+ throw new TransactionStrategyException("Problem when enlisting resource", th);
+ }
+ }
+
+ /**
* Is the currently associated transaction active?
* @return
* @throws TransactionStrategyException
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/TransactionStrategy.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -23,6 +23,7 @@
package org.jboss.soa.esb.common;
import javax.transaction.Synchronization;
+import javax.transaction.xa.XAResource;
/**
* This class represents the transaction strategy that is currently in force
@@ -128,23 +129,13 @@
*/
public abstract void registerSynchronization (Synchronization sync) throws TransactionStrategyException;
- public static void setStrategy (TransactionStrategy txSt)
- {
- _currentStrategy.set(txSt);
- }
+ /**
+ * Add a resource to the current transaction.
+ * @param resource
+ * @throws TransactionStrategyException
+ */
+ public abstract void enlistResource(XAResource resource) throws TransactionStrategyException;
- public static TransactionStrategy getStrategy ()
- {
- return _currentStrategy.get();
- }
-
- public static void removeStrategy ()
- {
- _currentStrategy.remove();
- }
-
- private final static ThreadLocal<TransactionStrategy> _currentStrategy = new ThreadLocal<TransactionStrategy>();
-
/**
* The null transaction strategy.
* @author kevin
@@ -208,6 +199,7 @@
*/
public void resume (Object tx) throws TransactionStrategyException
{
+ throw new TransactionStrategyException("Unsupported in this transaction strategy") ;
}
/**
@@ -217,9 +209,20 @@
*/
public void registerSynchronization (Synchronization sync) throws TransactionStrategyException
{
+ throw new TransactionStrategyException("Unsupported in this transaction strategy") ;
}
/**
+ * Add a resource to the current transaction.
+ * @param resource
+ * @throws TransactionStrategyException
+ */
+ public void enlistResource(XAResource resource) throws TransactionStrategyException
+ {
+ throw new TransactionStrategyException("Unsupported in this transaction strategy") ;
+ }
+
+ /**
* Is the currently associated transaction active?
* @return
* @throws TransactionStrategyException
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -34,11 +34,7 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -46,6 +42,7 @@
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -154,9 +151,12 @@
Object obj = _processMethod.invoke(_composer, new Object[] {msgIn});
// commit and acknowledge the reception of the message
// this is done after extracting the content of the JMS Message.
- if ( jmsSession != null && jmsSession.getTransacted() )
- jmsSession.commit();
-
+ if (jmsSession.getTransacted()) {
+ jmsSession.commit() ;
+ } else {
+ msgIn.acknowledge() ;
+ }
+
if (null == obj) {
_logger.warn("Action class method <"
+ _processMethod.getName()
@@ -246,17 +246,28 @@
private void rollbackJMSTransaction()
{
- try
- {
- if ( jmsSession != null && jmsSession.getTransacted() )
- jmsSession.rollback();
- } catch (JMSException e) {
- final String errorMsg = "JMSException during jmsSession.rollback()";
- _logger.error( errorMsg, e );
- }
+ try {
+ if (jmsSession.getTransacted()) {
+ jmsSession.rollback() ;
+ } else {
+ releaseSession() ;
+ }
+ } catch (final JMSException jmse) {
+ releaseSession() ;
+ }
}
+ private void releaseSession() {
+ if (jmsSession != null) {
+ try {
+ jmsConnectionPool.releaseSession(jmsSession);
+ } finally {
+ jmsSession = null;
+ }
+ }
+ }
+
private void cleanup() {
try {
if (_serviceName != null) {
@@ -409,7 +420,7 @@
_myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.ONE_ONE_PROTOCOL, destType,
jmsDestinationName, sFactClass, environment, _messageSelector, persistent, acknowledgeMode,
username, password, transacted );
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment, sFactClass, destType,username, password, transacted);
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment, sFactClass, username, password);
try {
jmsSession = _myEpr != null ? jmsConnectionPool.getSession(((JMSEpr)_myEpr).getAcknowledgeMode()):
@@ -429,7 +440,7 @@
jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
}
catch (NamingException ne) {
- if(jmsSession instanceof QueueSession) {
+ if(JMSEpr.QUEUE_TYPE.equals(destType)) {
jmsDestination = jmsSession.createQueue(jmsDestinationName);
} else {
jmsDestination = jmsSession.createTopic(jmsDestinationName);
@@ -440,17 +451,7 @@
NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
- if(jmsSession instanceof QueueSession && jmsDestination instanceof Queue) {
- jmsMessageConsumer = ((QueueSession)jmsSession).createReceiver((Queue)jmsDestination, _messageSelector);
- } else if(jmsSession instanceof TopicSession && jmsDestination instanceof Topic) {
- jmsMessageConsumer = ((TopicSession)jmsSession).createSubscriber((Topic)jmsDestination, _messageSelector, false);
- } else {
- try {
- throw new ConfigurationException("The JMS destination identified by name '" + jmsDestinationName + "' must match it's configured destination type '" + jmsDestinationName + "'.");
- } finally {
- cleanup();
- }
- }
+ jmsMessageConsumer = jmsSession.createConsumer(jmsDestination, _messageSelector);
} // ________________________________
/**
@@ -501,7 +502,7 @@
protected String jmsDestinationName;
- protected Session jmsSession;
+ protected JmsSession jmsSession;
protected Destination jmsDestination;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -32,8 +32,6 @@
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.common.TransactionStrategy;
-import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
@@ -87,10 +85,6 @@
* The error delay.
*/
private long errorDelay ;
-
- private TransactionStrategy transactionStrategy;
- private boolean transactional = false;
- private boolean rollbackOnPipelineFaults = true;
/**
* public constructor
@@ -162,11 +156,6 @@
}
}
_latencySecs = lSeconds ;
-
- transactional = _config.getBooleanAttribute(ListenerTagNames.TRANSACTED_TAG, false) ;
- transactionStrategy = TransactionStrategy.getTransactionStrategy(transactional) ;
-
- rollbackOnPipelineFaults = _config.getBooleanAttribute(ListenerTagNames.ROLLBACK_ON_PIPELINE_FAULTS, true);
}
/**
@@ -272,53 +261,17 @@
}
}
- /**
- * We have JMS transactional delivery/work semantics: before pulling a unit of work
- * we start a transaction. If the pipeline completes successfully then we will
- * commit that transaction and the OUW will be deleted. If we have to roll back
- * the transaction then the UOW will be placed back on the input "queue" (assumes that
- * the courier is transactional).
- *
- * @param maxWaitMillis
- */
public void waitForEventAndProcess (long maxWaitMillis)
{
Message message = null ;
- boolean problem = false;
-
try
{
- transactionStrategy.begin();
-
- /*
- * If this is a transactional receive then the courier
- * needs to be reset afterwards, because we can only
- * guarantee one instance per transaction. If the courier
- * instance does some handy multiplexing internally across different
- * transactions then we won't be able to handle that at this level.
- * However, at the moment that isn't an issue.
- */
-
- TransactionStrategy.setStrategy(transactionStrategy);
-
message = (maxWaitMillis > 0) ? _pickUpCourier
.pickup(maxWaitMillis) : null;
errorDelay = 0 ;
}
- catch (TransactionStrategyException ex)
- {
- // could not begin transaction!
-
- _logger.error("Could not begin transaction!");
-
- problem = true;
-
- return;
- }
catch (CourierTimeoutException e)
{
- problem = true;
-
return;
}
catch (FaultMessageException fme)
@@ -327,79 +280,35 @@
}
catch (CourierException e)
{
- _logger.debug("Courier Exception", e);
- if (errorDelay == 0)
- {
- errorDelay = MIN_ERROR_DELAY ;
- }
- else if (errorDelay < MAX_ERROR_DELAY)
- {
- errorDelay <<= 1 ;
- }
- e.printStackTrace();
- _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
-
- problem = true;
-
+ _logger.debug("Courier Exception", e);
+ if (errorDelay == 0)
+ {
+ errorDelay = MIN_ERROR_DELAY ;
+ }
+ else if (errorDelay < MAX_ERROR_DELAY)
+ {
+ errorDelay <<= 1 ;
+ }
+ _logger.warn("Error processing courier, backing off for " + errorDelay + " milliseconds") ;
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, errorDelay) ;
return;
}
- finally
- {
- if (problem)
- {
- try
- {
- if (transactionStrategy.getTransaction() != null)
- {
- CourierUtil.cleanCourier(_pickUpCourier);
-
- resetCourier();
- }
- }
- catch (Throwable ex)
- {
- CourierUtil.cleanCourier(_pickUpCourier);
-
- resetCourier(); // to be on the safe side!
- }
-
- rollbackTransaction();
- }
-
- TransactionStrategy.removeStrategy();
- }
if (null != message)
{
- try
- {
- final Message pipelineMessage = message ;
- final Object txHandle = transactionStrategy.suspend();
- final TransactionalRunner txRunner = new TransactionalRunner(_pickUpCourier, pipelineMessage, txHandle);
-
- updateThreadCount(+1);
- _execService.execute(txRunner);
-
- if (transactional)
- {
- _pickUpCourier = null; // runner will clean it up.
-
- resetCourier(); // we need another courier for the next msg.
- }
- }
- catch (TransactionStrategyException ex)
- {
- _logger.warn("Caught transaction related exception: ", ex);
- rollbackTransaction();
- }
+ final Message pipelineMessage = message ;
+ final Runnable pipelineRunner = new Runnable() {
+ public void run() {
+ try {
+ pipeline.process(pipelineMessage) ;
+ } finally {
+ updateThreadCount(-1) ;
+ }
+ }
+ } ;
+ updateThreadCount(+1);
+ _execService.execute(pipelineRunner);
}
- else
- {
- // nothing to do, so roll back the transaction before returning.
-
- rollbackTransaction();
- }
} // ________________________________
@@ -483,134 +392,7 @@
}
}
}
-
- private void rollbackTransaction ()
- {
- try
- {
- transactionStrategy.rollbackOnly();
- transactionStrategy.terminate();
- }
- catch (Throwable ex)
- {
- _logger.warn("Problem while attempting to rollback transaction!"); // timeout should catch it next!
- }
- }
-
- private void resetCourier ()
- {
- TwoWayCourier pickUpCourier = null;
- try
- {
- pickUpCourier = CourierFactory.getPickupCourier(_epr) ;
- try
- {
- final Method setPollLatency = pickUpCourier.getClass().getMethod(
- "setPollLatency", new Class[] { Long.class });
- setPollLatency.invoke(pickUpCourier, new Long(1000 * _latencySecs));
- }
- catch (final NoSuchMethodException nsme)
- {
- // OK, just leave it null
- }
- catch (final Exception ex)
- {
- CourierUtil.cleanCourier(pickUpCourier);
-
- _logger.error("Problems invoking setPollLatency(long)", ex);
- }
- }
- catch (final MalformedEPRException mepre)
- {
- _logger.error("Malformed EPR: " + _epr) ;
- }
- catch (final CourierException ce)
- {
- _logger.error("No appropriate courier can be obtained for " + _epr, ce);
- }
-
- _pickUpCourier = pickUpCourier;
- }
-
- class TransactionalRunner implements Runnable
- {
- public TransactionalRunner (PickUpOnlyCourier courier, Message pipelineMessage, Object txHandle)
- {
- _courier = courier;
- _pipelineMessage = pipelineMessage;
- _txHandle = txHandle;
- }
-
- public void run()
- {
- boolean problem = false;
-
- try
- {
- transactionStrategy.resume(_txHandle);
-
- pipeline.setTransactional(transactional);
-
- TransactionStrategy.setStrategy(transactionStrategy);
-
- /*
- * Current strategy is to commit as long as process returns true.
- * If fails, or any exceptions are caught, then we roll back.
- *
- * TODO re-examine the semantics around true/false from the pipeline.
- */
-
- // TODO consider adding a RollbackOnFalse option to allow override.
-
- problem = rollbackOnPipelineFaults && !pipeline.process(_pipelineMessage);
-
- if (!problem)
- {
- transactionStrategy.terminate();
- }
- }
- catch (TransactionStrategyException ex)
- {
- problem = true;
-
- _logger.warn("TransactionalRunner caught transaction exception: ", ex);
- }
- catch (RuntimeException ex)
- {
- problem = true;
-
- throw ex;
- }
- catch (Throwable ex)
- {
- problem = true;
-
- _logger.warn("TransactionalRunner caught throwable: ",ex);
- }
- finally
- {
- if (problem)
- {
- rollbackTransaction();
- }
-
- if (transactional)
- {
- CourierUtil.cleanCourier(_courier);
- }
-
- TransactionStrategy.removeStrategy();
-
- updateThreadCount(-1);
- }
- }
-
- private PickUpOnlyCourier _courier;
- private Message _pipelineMessage;
- private Object _txHandle;
- };
-
private ConfigTree _config;
private String _eprCategoryName;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -88,18 +88,6 @@
protected abstract void send(final Message p_oMsg, MessageProducer msgProducer ) throws JMSException;
/**
- * Get a session specific to the subclasses implementation
- * i.e Queue or Topic.
- *
- * @param pool
- * @return Session
- * @throws NamingException
- * @throws JMSException
- * @throws ConnectionException
- */
- protected abstract Session getSession( JmsConnectionPool pool ) throws NamingException, JMSException, ConnectionException;
-
- /**
* Creates a message producer specific to the subclasses implementation
* i.e QueueSender or TopicPublisher.
*
@@ -263,7 +251,7 @@
setJMSProperties( esbMessage, jmsMessage );
- for (Iterator II = m_oProps.keySet().iterator(); II.hasNext();)
+ for (Iterator<Object> II = m_oProps.keySet().iterator(); II.hasNext();)
{
String sKey = (String) II.next();
String sVal = m_oProps.getProperty(sKey);
@@ -337,12 +325,11 @@
* Will setup/create JMS connections, sessions, producers.
*
* @param configTrees
- * @param destinationType
* @throws ConfigurationException
* @throws JMSException
* @throws ConnectionException
*/
- protected void setUpProducers (final ConfigTree[] configTrees, final String destinationType) throws ConfigurationException, JMSException, ConnectionException
+ protected void setUpProducers (final ConfigTree[] configTrees) throws ConfigurationException, JMSException, ConnectionException
{
// REVIEW: The connection factory name is hardcoded and is the same as
// that of the queue connection factory.
@@ -379,9 +366,9 @@
String connectionFactory = configTrees[i].getAttribute(JMSEpr.CONNECTION_FACTORY_TAG, CONNECTION_FACTORY);
- connectionPools[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory, destinationType);
- sessions[i] = getSession( connectionPools[i] );
- producers[i] = createProducer( connectionPools[i], sAtt, sessions[i], environment );
+ connectionPools[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory);
+ sessions[i] = connectionPools[i].getSession() ;
+ producers[i] = createProducer( connectionPools[i], sAtt, sessions[i], environment );
final String persistentStr = configTrees[i].getAttribute( PERSISTENT_ATTR, "true" );
deliveryModes[i] = persistentStr.equalsIgnoreCase( "true" ) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -24,12 +24,10 @@
import java.util.Properties;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -38,7 +36,6 @@
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.NamingContextException;
import org.jboss.soa.esb.helpers.NamingContextPool;
@@ -57,7 +54,6 @@
*/
public class NotifyQueues extends NotifyJMS
{
- @SuppressWarnings("unused")
private Logger log = Logger.getLogger( NotifyQueues.class );
/**
@@ -72,64 +68,50 @@
public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
{
super(p_oP);
- setQueues(p_oP.getChildren(CHILD_QUEUE));
+ setUpProducers(p_oP.getChildren(CHILD_QUEUE));
} // __________________________________
-
- protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
- {
- setUpProducers( p_oaP, JMSEpr.QUEUE_TYPE );
- }
protected void send (final Message p_oMsg, MessageProducer msgProducer ) throws JMSException
{
- QueueSender oCurr = (QueueSender) msgProducer;
if ( log.isDebugEnabled() )
{
log.debug( "Sending to queue with DeliveryMode : " + msgProducer.getDeliveryMode());
log.debug( "Sending to queue with Priority : " + msgProducer.getPriority());
log.debug( "Sending to queue with TTL : " + msgProducer.getTimeToLive());
}
- oCurr.send(p_oMsg);
+ msgProducer.send(p_oMsg);
}
- protected QueueSession getSession(final JmsConnectionPool pool) throws NamingException, JMSException, ConnectionException
- {
- return pool.getQueueSession();
- }
-
protected MessageProducer createProducer(
final JmsConnectionPool pool,
final String destinationName,
final Session session,
final Properties environment) throws NamingException, JMSException, ConnectionException
{
-
- QueueSession queueSession = (QueueSession) session;
-
try
{
Context context = NamingContextPool.getNamingContext(environment);
try
{
- Queue queue=null;
+ Destination destination=null;
try
{
- queue = (Queue) context.lookup(destinationName);
+ destination = (Destination) context.lookup(destinationName);
}
catch (NamingException ne)
{
context = NamingContextPool.replaceNamingContext(context, environment);
try
{
- queue = (Queue) context.lookup(destinationName);
+ destination = (Destination) context.lookup(destinationName);
}
catch (NamingException nex)
{
//ActiveMQ
- queueSession.createTopic(destinationName);
+ session.createTopic(destinationName);
}
}
- return queueSession.createSender(queue);
+ return session.createProducer(destination);
}
finally
{
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -24,20 +24,17 @@
import java.util.Properties;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.NamingException;
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.NamingContextException;
import org.jboss.soa.esb.helpers.NamingContextPool;
@@ -56,58 +53,44 @@
{
super(p_oP);
- setTopics(p_oP.getChildren(CHILD_TOPIC));
+ setUpProducers(p_oP.getChildren(CHILD_TOPIC));
} // __________________________________
- protected void setTopics (ConfigTree[] configTrees) throws ConfigurationException, JMSException, ConnectionException
- {
- setUpProducers( configTrees, JMSEpr.TOPIC_TYPE );
- }
-
protected void send (final Message p_oMsg, MessageProducer msgProducer ) throws JMSException
{
- TopicPublisher oCurr = (TopicPublisher) msgProducer;
- oCurr.publish(p_oMsg);
+ msgProducer.send(p_oMsg);
}
- protected TopicSession getSession(final JmsConnectionPool pool) throws NamingException, JMSException, ConnectionException
- {
- return pool.getTopicSession();
- }
-
protected MessageProducer createProducer(
final JmsConnectionPool pool,
final String destinationName,
final Session session,
final Properties environment) throws NamingException, JMSException, ConnectionException
{
-
- TopicSession topicSession = (TopicSession) session;
-
try
{
Context context = NamingContextPool.getNamingContext(environment);
try
{
- Topic topic=null;
+ Destination destination=null;
try
{
- topic = (Topic) context.lookup(destinationName);
+ destination = (Destination) context.lookup(destinationName);
}
catch (NamingException ne)
{
context = NamingContextPool.replaceNamingContext(context, environment);
try
{
- topic = (Topic) context.lookup(destinationName);
+ destination = (Destination) context.lookup(destinationName);
}
catch (NamingException nex)
{
//ActiveMQ
- topicSession.createTopic(destinationName);
+ session.createTopic(destinationName);
}
}
- return topicSession.createPublisher(topic);
+ return session.createProducer(destination);
}
finally
{
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/schedule/ScheduleProvider.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -136,6 +136,7 @@
// this is just to make sure they're unique - i.e. so as 1+
// "things" can listen to the same schedule...
+ // This is not thread safe!
name += ("-" + nameDelta++);
trigger.setName(name);
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierIntegrationTest.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -34,7 +34,6 @@
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.couriers.CourierException;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
/**
@@ -54,7 +53,7 @@
public void getJmsSession() throws CourierException, JMSException
{
Session jmsSession = jmsCourier.getJmsSession();
- assertTrue( jmsSession.getTransacted() );
+ assertFalse( jmsSession.getTransacted() );
jmsCourier.cleanup();
}
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainerUnitTest.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -48,10 +48,9 @@
public void createPoolKey_null_environment()
{
final Properties env = null;
- Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE );
+ Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory);
assertEquals( connectionFactory, poolKey.get( CONNECTION_FACTORY_TAG ) );
- assertEquals( QUEUE_TYPE, poolKey.get( DESTINATION_TYPE_TAG ) );
}
@Test
@@ -62,7 +61,7 @@
final Properties env = new Properties();
env.put( Context.SECURITY_PRINCIPAL, username );
env.put( Context.SECURITY_CREDENTIALS, password );
- Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE);
+ Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory);
assertEquals( username, poolKey.get( Context.SECURITY_PRINCIPAL ) );
assertEquals( password, poolKey.get( Context.SECURITY_CREDENTIALS ) );
}
@@ -73,32 +72,12 @@
final String username = "daniel";
final String password = "passwd";
final Properties env = null;
- Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE,
- username, password, false);
+ Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory,
+ username, password);
assertEquals( username, poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG ) );
assertEquals( password, poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG ) );
}
- @Test
- public void createPoolKey_with_transacted_properties()
- {
- final boolean transacted = true;
- final Properties env = null;
- Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE,
- null, null, transacted);
- assertEquals( transacted, Boolean.valueOf ( poolKey.get( JMSEpr.TRANSACTED_TAG ) ) );
- }
-
- @Test
- public void createPoolKey_with_non_transacted_properties()
- {
- final boolean transacted = false;
- final Properties env = null;
- Map<String, String> poolKey = JmsConnectionPoolContainer.createPoolKey( env, connectionFactory, QUEUE_TYPE,
- null, null, transacted);
- assertEquals( transacted, Boolean.valueOf ( poolKey.get( JMSEpr.TRANSACTED_TAG ) ) );
- }
-
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter( JmsConnectionPoolContainerUnitTest.class );
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -131,14 +131,6 @@
{
return null;
}
-
- @Override
- protected Session getSession( JmsConnectionPool pool )
- throws NamingException, JMSException, ConnectionException
- {
- return null;
- }
-
}
private static class MockJMSSession implements Session
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -36,6 +36,7 @@
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -265,10 +266,44 @@
}
else
{
- return method.invoke(queueConnection, args) ;
+ final Object response = method.invoke(queueConnection, args) ;
+ if (response instanceof QueueSession)
+ {
+ final QueueSession queueSession = (QueueSession)response ;
+ return (QueueSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueSession.class},
+ new MockQueueSessionInvocationHandler(queueSession)) ;
+ }
+ else
+ {
+ return response ;
+ }
}
}
}
+
+ private static final class MockQueueSessionInvocationHandler implements InvocationHandler
+ {
+ private final QueueSession queueSession ;
+
+ MockQueueSessionInvocationHandler(final QueueSession queueSession)
+ {
+ this.queueSession = queueSession ;
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("recover".equals(methodName))
+ {
+ return null ;
+ }
+ else
+ {
+ return method.invoke(queueSession, args) ;
+ }
+ }
+ }
public static junit.framework.Test suite()
{
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -33,6 +33,7 @@
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -217,8 +218,42 @@
}
else
{
- return method.invoke(topicConnection, args) ;
+ final Object response = method.invoke(topicConnection, args) ;
+ if (response instanceof TopicSession)
+ {
+ final TopicSession topicSession = (TopicSession)response ;
+ return (TopicSession)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSession.class},
+ new MockTopicSessionInvocationHandler(topicSession)) ;
}
+ else
+ {
+ return response ;
+ }
}
}
+ }
+
+ private static final class MockTopicSessionInvocationHandler implements InvocationHandler
+ {
+ private final TopicSession topicSession ;
+
+ MockTopicSessionInvocationHandler(final TopicSession topicSession)
+ {
+ this.topicSession = topicSession ;
+ }
+
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable
+ {
+ final String methodName = method.getName() ;
+ if ("recover".equals(methodName))
+ {
+ return null ;
+ }
+ else
+ {
+ return method.invoke(topicSession, args) ;
+ }
+ }
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingIntegrationTest.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -32,7 +32,7 @@
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
-import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
import org.jboss.soa.esb.common.Environment;
import org.junit.Before;
import org.junit.Test;
@@ -57,14 +57,14 @@
{
JmsConnectionPool jmsConnectionPool = null;
- jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory", JMSEpr.QUEUE_TYPE);
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
assertEquals(0, jmsConnectionPool.getSessionsInPool());
//Open 3 concurrent sessions
- Session session1 = jmsConnectionPool.getQueueSession();
+ JmsSession session1 = jmsConnectionPool.getSession();
assertEquals(1, jmsConnectionPool.getSessionsInPool());
- Session session2 = jmsConnectionPool.getQueueSession();
+ JmsSession session2 = jmsConnectionPool.getSession();
assertEquals(2, jmsConnectionPool.getSessionsInPool());
- Session session3 = jmsConnectionPool.getQueueSession();
+ JmsSession session3 = jmsConnectionPool.getSession();
assertEquals(3, jmsConnectionPool.getSessionsInPool());
//Close them
jmsConnectionPool.closeSession(session1);
@@ -76,8 +76,8 @@
assertEquals(0, jmsConnectionPool.getSessionsInPool());
assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
- //Use it again and add one session
- jmsConnectionPool.getQueueSession();
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+ jmsConnectionPool.getSession();
assertEquals(1, jmsConnectionPool.getSessionsInPool());
assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
//I should be able to remove the entire pool and have it do closing
@@ -88,61 +88,55 @@
@Test
public void testCreateSecondPool() throws Exception
{
- JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
- jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+ JmsConnectionPool jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
+ jmsConnectionPool1 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory");
//This should be the same pool
assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
- JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(environment, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
+ JmsConnectionPool jmsConnectionPool2 = JmsConnectionPoolContainer.getPool(null, "ConnectionFactory");
//This should be a different pool, so now we should have 2.
assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
- JmsConnectionPool jmsConnectionPool3 = JmsConnectionPoolContainer.getPool(null, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
- //This should be a different pool, so now we should have 3.
- assertEquals(3, JmsConnectionPoolContainer.getNumberOfPools());
-
//Now lets cleanup after ourselves
- jmsConnectionPool3.removeSessionPool();
- assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
+ jmsConnectionPool2.removeSessionPool();
+ assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
jmsConnectionPool1.removeSessionPool();
- jmsConnectionPool2.removeSessionPool();
- jmsConnectionPool3.removeSessionPool();
assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
}
@Test
public void testPoolAndSessionsWithAcknowledgeMode() throws Exception
{
- JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory", JMSEpr.QUEUE_TYPE);
+ JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
assertEquals(0, jmsConnectionPool.getSessionsInPool());
- Session autoAckSession1 = jmsConnectionPool.getQueueSession(Session.AUTO_ACKNOWLEDGE);
+ JmsSession autoAckSession1 = jmsConnectionPool.getSession(Session.AUTO_ACKNOWLEDGE);
assertEquals(Session.AUTO_ACKNOWLEDGE, autoAckSession1.getAcknowledgeMode());
assertEquals(1, jmsConnectionPool.getSessionsInPool());
assertEquals(1, jmsConnectionPool.getSessionsInPool(Session.AUTO_ACKNOWLEDGE));
- Session autoAckSession2 = jmsConnectionPool.getQueueSession(Session.AUTO_ACKNOWLEDGE);
+ JmsSession autoAckSession2 = jmsConnectionPool.getSession(Session.AUTO_ACKNOWLEDGE);
assertEquals(Session.AUTO_ACKNOWLEDGE, autoAckSession2.getAcknowledgeMode());
assertEquals(2, jmsConnectionPool.getSessionsInPool());
assertEquals(2, jmsConnectionPool.getSessionsInPool(Session.AUTO_ACKNOWLEDGE));
- Session clientAckSession1 = jmsConnectionPool.getQueueSession(Session.CLIENT_ACKNOWLEDGE);
+ JmsSession clientAckSession1 = jmsConnectionPool.getSession(Session.CLIENT_ACKNOWLEDGE);
assertEquals(Session.CLIENT_ACKNOWLEDGE, clientAckSession1.getAcknowledgeMode());
assertEquals(3, jmsConnectionPool.getSessionsInPool());
assertEquals(1, jmsConnectionPool.getSessionsInPool(Session.CLIENT_ACKNOWLEDGE));
- Session clientAckSession2 = jmsConnectionPool.getQueueSession(Session.CLIENT_ACKNOWLEDGE);
+ JmsSession clientAckSession2 = jmsConnectionPool.getSession(Session.CLIENT_ACKNOWLEDGE);
assertEquals(Session.CLIENT_ACKNOWLEDGE, clientAckSession2.getAcknowledgeMode());
assertEquals(4, jmsConnectionPool.getSessionsInPool());
assertEquals(2, jmsConnectionPool.getSessionsInPool(Session.CLIENT_ACKNOWLEDGE));
- Session dupsOkAcSession1 = jmsConnectionPool.getQueueSession(Session.DUPS_OK_ACKNOWLEDGE);
+ JmsSession dupsOkAcSession1 = jmsConnectionPool.getSession(Session.DUPS_OK_ACKNOWLEDGE);
assertEquals(Session.DUPS_OK_ACKNOWLEDGE, dupsOkAcSession1.getAcknowledgeMode());
assertEquals(5, jmsConnectionPool.getSessionsInPool());
assertEquals(1, jmsConnectionPool.getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE));
- Session dupsOkAcSession2 = jmsConnectionPool.getQueueSession(Session.DUPS_OK_ACKNOWLEDGE);
+ JmsSession dupsOkAcSession2 = jmsConnectionPool.getSession(Session.DUPS_OK_ACKNOWLEDGE);
assertEquals(Session.DUPS_OK_ACKNOWLEDGE, dupsOkAcSession2.getAcknowledgeMode());
assertEquals(6, jmsConnectionPool.getSessionsInPool());
assertEquals(2, jmsConnectionPool.getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE));
@@ -182,7 +176,8 @@
assertEquals(0, jmsConnectionPool.getSessionsInPool());
assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
- jmsConnectionPool.getQueueSession();
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(environment,"ConnectionFactory");
+ jmsConnectionPool.getSession();
assertEquals(1, jmsConnectionPool.getSessionsInPool());
assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
Modified: labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java
===================================================================
--- labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/DataFiler.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -162,7 +162,7 @@
* @param f_sb statistics bean
*/
public void insertStatistics(StatisticsBean f_sb) {
- TransactionStrategy txStrategy = TransactionStrategy.getStrategy();
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
Object txHandle = null;
if (txStrategy != null)
Modified: labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java
===================================================================
--- labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/InvokerFiler.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -67,7 +67,7 @@
* @see org.jboss.soa.esb.monitoring.server.Filer#persistData()
*/
public void persistData() {
- TransactionStrategy txS = TransactionStrategy.getStrategy();
+ TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
Object txHandle = null;
if (txS != null)
Modified: labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java
===================================================================
--- labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java 2008-03-21 11:34:08 UTC (rev 19171)
+++ labs/jbossesb/trunk/product/tools/console/management/src/main/java/org/jboss/soa/esb/monitoring/server/OperationsFiler.java 2008-03-21 12:27:47 UTC (rev 19172)
@@ -95,7 +95,7 @@
public void insertOperations(ServiceControlCommand f_ob) {
Session sess = null;
Transaction tx = null;
- TransactionStrategy txS = TransactionStrategy.getStrategy();
+ TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
Object txHandle = null;
if (txS != null)
@@ -158,7 +158,7 @@
public void updateActiveFlag(String serverName) {
Session sess = null;
Transaction tx = null;
- TransactionStrategy txS = TransactionStrategy.getStrategy();
+ TransactionStrategy txS = TransactionStrategy.getTransactionStrategy(true);
Object txHandle = null;
if (txS != null)
More information about the jboss-svn-commits
mailing list