[jboss-svn-commits] JBL Code SVN: r35463 - in labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta: tests/src/org/jboss/internal/soa/esb/rosetta/pooling and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Oct 8 08:42:14 EDT 2010
Author: kevin.conner at jboss.com
Date: 2010-10-08 08:42:13 -0400 (Fri, 08 Oct 2010)
New Revision: 35463
Modified:
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
Log:
Track session users, producers/consumers and change caching of XA sessions: JBESB-3201
Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2010-10-08 12:42:13 UTC (rev 35463)
@@ -71,6 +71,7 @@
private static int CONFIGURED_POOL_SIZE = DEFAULT_POOL_SIZE;
private static int CONFIGURED_SLEEP = DEFAULT_SLEEP;
+ static final int XA_TRANSACTED = -1 ;
/**
* The executor used to create sessions.
*/
@@ -245,11 +246,12 @@
final JmsXASession currentSession = getXASession() ;
if (currentSession != null)
{
+ currentSession.incrementReferenceCount() ;
return currentSession ;
}
}
- final int mode = (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode) ;
+ final int mode = (transacted ? XA_TRANSACTED : acknowledgeMode) ;
final long end = System.currentTimeMillis() + (sleepTime * 1000) ;
boolean emitExpiry = LOGGER.isDebugEnabled() ;
@@ -512,7 +514,8 @@
return getSessionsInPool(Session.AUTO_ACKNOWLEDGE) +
getSessionsInPool(Session.CLIENT_ACKNOWLEDGE) +
getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE) +
- getSessionsInPool(Session.SESSION_TRANSACTED) ;
+ getSessionsInPool(Session.SESSION_TRANSACTED) +
+ getSessionsInPool(XA_TRANSACTED) ;
}
/**
@@ -596,7 +599,7 @@
* @param session The XA session.
* @throws ConnectionException if there is no transaction active.
*/
- synchronized void associateTransaction(final JmsXASession session)
+ void associateTransaction(final JmsXASession session)
throws ConnectionException
{
JmsSessionPool sessionPool = findOwnerPool(session);
@@ -610,7 +613,7 @@
* Disassociate the JMS XA Session from a transaction.
* @param session The XA session.
*/
- synchronized void disassociateTransaction(final JmsXASession session)
+ void disassociateTransaction(final JmsXASession session)
{
JmsSessionPool sessionPool = findOwnerPool(session);
@@ -618,7 +621,22 @@
sessionPool.disassociateTransaction(session);
}
}
+
+ /**
+ * Check that the session is associated with the current transaction.
+ * @param session The XA session.
+ */
+ boolean isAssociated(final JmsXASession session)
+ {
+ JmsSessionPool sessionPool = findOwnerPool(session);
+ if(sessionPool != null) {
+ return sessionPool.isAssociated(session);
+ } else {
+ return false ;
+ }
+ }
+
JmsSessionPool findOwnerPool(final JmsSession session) {
return session.getSessionPool() ;
}
@@ -715,10 +733,12 @@
freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
}
public synchronized void removeSessionPool() {
@@ -746,6 +766,11 @@
if (freeSessions.size() > 0)
{
final JmsSession session = freeSessions.remove(freeSessions.size()-1);
+ if (transacted)
+ {
+ final JmsXASession xaSession = (JmsXASession)session ;
+ xaSession.incrementReferenceCount() ;
+ }
inUseSessions.add(session);
return session ;
} else if (getSessionsInPool() < maxSessionsPerConnection) {
@@ -795,8 +820,8 @@
{
final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
- freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
- inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+ freeSessionsMap.put(XA_TRANSACTED, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(XA_TRANSACTED, new ArrayList<JmsSession>() );
}
else if (factoryConnection instanceof ConnectionFactory)
{
@@ -848,7 +873,9 @@
final JmsSession session ;
if (transacted) {
- session = new JmsXASession(JmsConnectionPool.this, JmsSessionPool.this, ((XAConnection)currentConnection).createXASession(), currentID, acknowledgeMode);
+ final JmsXASession xaSession = new JmsXASession(JmsConnectionPool.this, JmsSessionPool.this, ((XAConnection)currentConnection).createXASession(), currentID, acknowledgeMode);
+ xaSession.incrementReferenceCount() ;
+ session = xaSession ;
} else {
session = new JmsSession(JmsConnectionPool.this, JmsSessionPool.this, currentConnection.createSession(transacted, acknowledgeMode), currentID, acknowledgeMode);
}
@@ -1004,7 +1031,7 @@
* @param session The XA session.
* @throws ConnectionException if there is no transaction active.
*/
- synchronized void associateTransaction(final JmsXASession session)
+ void associateTransaction(final JmsXASession session)
throws ConnectionException
{
final Object tx = getTransaction() ;
@@ -1012,20 +1039,51 @@
{
throw new ConnectionException("No active transaction") ;
}
- transactionsToSessions.put(tx, session) ;
- sessionsToTransactions.put(session, tx) ;
+ synchronized (this)
+ {
+ transactionsToSessions.put(tx, session) ;
+ sessionsToTransactions.put(session, tx) ;
+ }
}
/**
* Disassociate the JMS XA Session from a transaction.
* @param session The XA session.
*/
- synchronized void disassociateTransaction(final JmsXASession session)
+ void disassociateTransaction(final JmsXASession session)
{
- final Object tx = sessionsToTransactions.remove(session) ;
- transactionsToSessions.remove(tx) ;
+ final Object tx ;
+ synchronized(this)
+ {
+ tx = sessionsToTransactions.remove(session) ;
+ transactionsToSessions.remove(tx) ;
+ }
}
+ /**
+ * Check that the session is associated with the current transaction.
+ * @param session The XA session.
+ */
+ boolean isAssociated(final JmsXASession session)
+ {
+ try
+ {
+ final Object current = TransactionStrategy.getTransactionStrategy(true).getTransaction() ;
+ final Object tx ;
+ synchronized(this)
+ {
+ tx = sessionsToTransactions.get(session) ;
+ }
+ if (tx != null)
+ {
+ return tx.equals(current) ;
+ }
+ }
+ catch (final TransactionStrategyException tse) {} // ignore
+
+ return false ;
+ }
+
public void releaseInUseSession(JmsSession session) {
final int mode = session.getRequestedAcknowledgeMode() ;
@@ -1053,7 +1111,7 @@
final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
if (sessions != null) {
- sessions.add(session) ;
+ sessions.add(session.duplicateSession()) ;
}
}
session.releaseResources() ;
Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2010-10-08 12:42:13 UTC (rev 35463)
@@ -63,21 +63,29 @@
*/
private final JmsSessionPool sessionPool ;
/**
- * The session delegate.
+ * The underlying session.
*/
private final Session session ;
/**
+ * The session delegate.
+ */
+ private final Session sessionDelegate ;
+ /**
* The pool instance id.
*/
private final long id ;
/**
* The session acknowledge mode.
*/
- private final int acknowledgeMode ;
+ protected final int acknowledgeMode ;
/**
* The requested acknowledge mode for this session.
*/
private final int requestedAcknowledgeMode ;
+ /**
+ * The flag indicating whether this session is invalid.
+ */
+ protected volatile boolean invalid ;
/**
* Flag indicating whether this session is suspect or not.
@@ -98,6 +106,27 @@
private HashSet<MessageProducer> messageProducerSet ;
/**
+ * Duplicate the session wrapper.
+ * @param connectionPool The connection pool associated with this session.
+ * @param sessionPool The session pool associated with this session.
+ * @param session The wrapped session delegate.
+ * @param id The pool instance id.
+ * @param requestedAcknowledgeMode The requested acknowledge mode for this session.
+ * @param acknowledgeMode The original acknowledge mode for this session.
+ */
+ JmsSession(final JmsConnectionPool connectionPool, final JmsSessionPool sessionPool, final Session session, final long id, final int requestedAcknowledgeMode,
+ final int acknowledgeMode)
+ {
+ this.connectionPool = connectionPool ;
+ this.sessionPool = sessionPool ;
+ this.id = id ;
+ this.session = session ;
+ this.sessionDelegate = (Session)getExceptionHandler(connectionPool, Session.class, session) ;
+ this.requestedAcknowledgeMode = requestedAcknowledgeMode ;
+ this.acknowledgeMode = acknowledgeMode ;
+ }
+
+ /**
* Create the session wrapper.
* @param connectionPool The connection pool associated with this session.
* @param sessionPool The session pool associated with this session.
@@ -112,7 +141,8 @@
this.connectionPool = connectionPool ;
this.sessionPool = sessionPool ;
this.id = id ;
- this.session = (Session)getExceptionHandler(connectionPool, Session.class, session) ;
+ this.session = session ;
+ this.sessionDelegate = (Session)getExceptionHandler(connectionPool, Session.class, session) ;
this.requestedAcknowledgeMode = requestedAcknowledgeMode ;
acknowledgeMode = session.getAcknowledgeMode() ;
// Workaround for JBESB-1873
@@ -129,132 +159,132 @@
public void close() throws JMSException
{
- session.close();
+ sessionDelegate.close();
}
public void commit() throws JMSException
{
setSuspect(true) ;
- session.commit();
+ sessionDelegate.commit();
setSuspect(false) ;
}
public QueueBrowser createBrowser(Queue arg0, String arg1)
throws JMSException
{
- return trackQueueBrowser(session.createBrowser(arg0, arg1));
+ return trackQueueBrowser(sessionDelegate.createBrowser(arg0, arg1));
}
public QueueBrowser createBrowser(Queue arg0) throws JMSException
{
- return trackQueueBrowser(session.createBrowser(arg0));
+ return trackQueueBrowser(sessionDelegate.createBrowser(arg0));
}
public BytesMessage createBytesMessage() throws JMSException
{
associate() ;
- return session.createBytesMessage();
+ return sessionDelegate.createBytesMessage();
}
public MessageConsumer createConsumer(Destination arg0, String arg1,
boolean arg2) throws JMSException
{
- return trackMessageConsumer(session.createConsumer(arg0, arg1, arg2));
+ return trackMessageConsumer(sessionDelegate.createConsumer(arg0, arg1, arg2));
}
public MessageConsumer createConsumer(Destination arg0, String arg1)
throws JMSException
{
- return trackMessageConsumer(session.createConsumer(arg0, arg1));
+ return trackMessageConsumer(sessionDelegate.createConsumer(arg0, arg1));
}
public MessageConsumer createConsumer(Destination arg0) throws JMSException
{
- return trackMessageConsumer(session.createConsumer(arg0));
+ return trackMessageConsumer(sessionDelegate.createConsumer(arg0));
}
public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1,
String arg2, boolean arg3) throws JMSException
{
- return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1, arg2, arg3));
+ return trackTopicSubscriber(sessionDelegate.createDurableSubscriber(arg0, arg1, arg2, arg3));
}
public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1)
throws JMSException
{
- return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1));
+ return trackTopicSubscriber(sessionDelegate.createDurableSubscriber(arg0, arg1));
}
public MapMessage createMapMessage() throws JMSException
{
associate() ;
- return session.createMapMessage();
+ return sessionDelegate.createMapMessage();
}
public Message createMessage() throws JMSException
{
associate() ;
- return session.createMessage();
+ return sessionDelegate.createMessage();
}
public ObjectMessage createObjectMessage() throws JMSException
{
associate() ;
- return session.createObjectMessage();
+ return sessionDelegate.createObjectMessage();
}
public ObjectMessage createObjectMessage(Serializable arg0)
throws JMSException
{
associate() ;
- return session.createObjectMessage(arg0);
+ return sessionDelegate.createObjectMessage(arg0);
}
public MessageProducer createProducer(Destination arg0) throws JMSException
{
- return trackMessageProducer(session.createProducer(arg0));
+ return trackMessageProducer(sessionDelegate.createProducer(arg0));
}
public Queue createQueue(String arg0) throws JMSException
{
associate() ;
- return session.createQueue(arg0);
+ return sessionDelegate.createQueue(arg0);
}
public StreamMessage createStreamMessage() throws JMSException
{
associate() ;
- return session.createStreamMessage();
+ return sessionDelegate.createStreamMessage();
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
associate() ;
- return session.createTemporaryQueue();
+ return sessionDelegate.createTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
associate() ;
- return session.createTemporaryTopic();
+ return sessionDelegate.createTemporaryTopic();
}
public TextMessage createTextMessage() throws JMSException
{
associate() ;
- return session.createTextMessage();
+ return sessionDelegate.createTextMessage();
}
public TextMessage createTextMessage(String arg0) throws JMSException
{
associate() ;
- return session.createTextMessage(arg0);
+ return sessionDelegate.createTextMessage(arg0);
}
public Topic createTopic(String arg0) throws JMSException
{
associate() ;
- return session.createTopic(arg0);
+ return sessionDelegate.createTopic(arg0);
}
public int getAcknowledgeMode() throws JMSException
@@ -265,43 +295,43 @@
public MessageListener getMessageListener() throws JMSException
{
associate() ;
- return session.getMessageListener();
+ return sessionDelegate.getMessageListener();
}
public boolean getTransacted() throws JMSException
{
associate() ;
- return session.getTransacted();
+ return sessionDelegate.getTransacted();
}
public void recover() throws JMSException
{
associate() ;
- session.recover();
+ sessionDelegate.recover();
}
public void rollback() throws JMSException
{
setSuspect(true) ;
- session.rollback();
+ sessionDelegate.rollback();
setSuspect(false) ;
}
public void run()
{
- session.run();
+ sessionDelegate.run();
}
public void setMessageListener(MessageListener arg0) throws JMSException
{
associate() ;
- session.setMessageListener(arg0);
+ sessionDelegate.setMessageListener(arg0);
}
public void unsubscribe(String arg0) throws JMSException
{
associate() ;
- session.unsubscribe(arg0);
+ sessionDelegate.unsubscribe(arg0);
}
private synchronized QueueBrowser trackQueueBrowser(QueueBrowser queueBrowser)
@@ -423,12 +453,21 @@
protected void associate()
throws JMSException
{
+ if (invalid)
+ {
+ throw new JMSException("Session no longer valid") ;
+ }
}
protected JmsSessionPool getSessionPool()
{
return sessionPool ;
}
+
+ protected Session getSession()
+ {
+ return session ;
+ }
protected void setSuspect(final boolean suspect)
{
@@ -444,6 +483,17 @@
{
return requestedAcknowledgeMode ;
}
+
+ public boolean isInvalid()
+ {
+ return invalid ;
+ }
+
+ public JmsSession duplicateSession()
+ {
+ invalid = true ;
+ return new JmsSession(connectionPool, sessionPool, session, id, requestedAcknowledgeMode, acknowledgeMode) ;
+ }
/**
* Wrap the object in an exception handler.
Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2010-10-08 12:42:13 UTC (rev 35463)
@@ -25,14 +25,16 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
+import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.jms.XASession;
-import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.xa.XAResource;
@@ -53,22 +55,42 @@
/**
* The session delegate.
*/
- private final XASession session ;
+ private final XASession xaSessionDelegate ;
/**
- * Flag indicating whether this session is associated with a transaction.
+ * Cleanup actions
*/
- private boolean associated ;
+ private enum Cleanup { close, release }
/**
- * Cleanup actions
+ * The cleanup action for the synchronization.
*/
- private enum Cleanup { close, release, none }
+ private Cleanup cleanupAction = Cleanup.close ;
+ /**
+ * Flag representing whether the handlers are valid or not.
+ */
+ private AtomicBoolean handlerValid = new AtomicBoolean(true) ;
+ /**
+ * Use count for tracking references.
+ */
+ private AtomicInteger refCount = new AtomicInteger(0) ;
/**
- * The cleanup action for the synchronization.
+ * Duplicate the session wrapper.
+ * @param connectionPool The current connection pool
+ * @param sessionPool The current session pool
+ * @param session The underlying session.
+ * @param id The pool instance id.
+ * @param requestedAcknowledgeMode The requested acknowledge mode for this session.
+ * @param acknowledgeMode The original acknowledge mode for this session.
*/
- private Cleanup cleanupAction = Cleanup.none ;
+ JmsXASession(final JmsConnectionPool connectionPool, final JmsSessionPool sessionPool, final Session session,
+ final long id, final int requestedAcknowledgeMode, final int acknowledgeMode)
+ {
+ super(connectionPool, sessionPool, session, id, requestedAcknowledgeMode, acknowledgeMode) ;
+ this.connectionPool = connectionPool ;
+ this.xaSessionDelegate = (XASession)getExceptionHandler(connectionPool, XASession.class, session) ;
+ }
/**
* Create the session wrapper.
@@ -84,7 +106,7 @@
{
super(connectionPool, sessionPool, session, id, requestedAcknowledgeMode) ;
this.connectionPool = connectionPool ;
- this.session = (XASession)getExceptionHandler(connectionPool, XASession.class, session) ;
+ this.xaSessionDelegate = (XASession)getExceptionHandler(connectionPool, XASession.class, session) ;
}
@Override
@@ -111,67 +133,119 @@
@Override
protected MessageProducer getMessageProducer(MessageProducer messageProducer)
{
- final InvocationHandler handler = new AssociationHandler(messageProducer) ;
+ final InvocationHandler handler = new AssociationHandler(messageProducer, handlerValid) ;
return (MessageProducer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageProducer.class}, handler );
}
@Override
protected MessageConsumer getMessageConsumer(MessageConsumer messageConsumer)
{
- final InvocationHandler handler = new AssociationHandler(messageConsumer) ;
+ final InvocationHandler handler = new AssociationHandler(messageConsumer, handlerValid) ;
return (MessageConsumer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageConsumer.class}, handler);
}
@Override
protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
{
- final InvocationHandler handler = new AssociationHandler(queueBrowser) ;
+ final InvocationHandler handler = new AssociationHandler(queueBrowser, handlerValid) ;
return (QueueBrowser)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueBrowser.class}, handler);
}
@Override
protected TopicSubscriber getTopicSubscriber(TopicSubscriber topicSubscriber)
{
- final InvocationHandler handler = new AssociationHandler(topicSubscriber) ;
+ final InvocationHandler handler = new AssociationHandler(topicSubscriber, handlerValid) ;
return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
}
- protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+ private void releaseSession()
{
- if (associated)
+ try
{
- cleanupAction = Cleanup.close ;
+ connectionPool.handleReleaseSession(this) ;
}
- else
+ finally
{
+ handlerValid.set(false) ;
+ handlerValid = new AtomicBoolean(true) ;
+ }
+ }
+
+ private void closeSession()
+ {
+ try
+ {
connectionPool.handleCloseSession(this) ;
}
+ finally
+ {
+ handlerValid.set(false) ;
+ handlerValid = new AtomicBoolean(true) ;
+ }
}
+
+ public JmsSession duplicateSession()
+ {
+ invalid = true ;
+ return new JmsXASession(connectionPool, getSessionPool(), getSession(), getId(), getRequestedAcknowledgeMode(), acknowledgeMode) ;
+ }
+ protected void incrementReferenceCount()
+ {
+ refCount.incrementAndGet() ;
+ }
+
+ protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+ {
+ final boolean associated = connectionPool.isAssociated(this) ;
+ final int count = refCount.decrementAndGet() ;
+ if (!associated && (count == 0))
+ {
+ if (cleanupAction == Cleanup.release)
+ {
+ releaseSession() ;
+ }
+ else
+ {
+ closeSession() ;
+ }
+ }
+ }
+
protected synchronized void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
{
+ final boolean associated = connectionPool.isAssociated(this) ;
+ final int count = refCount.decrementAndGet() ;
if (associated)
{
cleanupAction = Cleanup.release ;
}
- else
+ else if (count == 0)
{
- connectionPool.handleReleaseSession(this) ;
+ releaseSession() ;
}
}
protected synchronized void associate()
throws JMSException
{
- if (!associated)
+ if (invalid)
{
- cleanupAction = Cleanup.none ;
+ throw new JMSException("Session no longer valid") ;
+ }
+ if (!connectionPool.isAssociated(this))
+ {
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
try
{
+ if (!transactionStrategy.isActive())
+ {
+ throw new JMSException("No active transaction") ;
+ }
+
transactionStrategy.registerSynchronization(this) ;
setSuspect(true) ;
- final XAResource resource = session.getXAResource() ;
+ final XAResource resource = xaSessionDelegate.getXAResource() ;
transactionStrategy.enlistResource(resource) ;
}
catch (final TransactionStrategyException tse)
@@ -192,7 +266,6 @@
}
setSuspect(false) ;
- associated = true ;
}
}
@@ -203,22 +276,17 @@
public synchronized void afterCompletion(final int result)
{
connectionPool.disassociateTransaction(this) ;
- switch (cleanupAction)
+ if (refCount.get() == 0)
{
- case close:
- if (result == Status.STATUS_COMMITTED)
+ if (cleanupAction == Cleanup.close)
{
- connectionPool.handleCloseSession(this) ;
- break ;
+ closeSession() ;
}
- // fall through
- case release:
- connectionPool.handleReleaseSession(this) ;
- break ;
- case none:
- // Reference held by caller
+ else
+ {
+ releaseSession() ;
+ }
}
- associated = false ;
}
/**
@@ -231,27 +299,55 @@
* The target instance.
*/
private final Object target ;
+ /**
+ * Flag representing validity of the handler.
+ */
+ private final AtomicBoolean handlerValid ;
/**
* Construct the handler using the specified target.
* @param target The target instance.
+ * @param handlerValid The flag representing whether the handler is still valid.
*/
- public AssociationHandler(final Object target)
+ public AssociationHandler(final Object target, final AtomicBoolean handlerValid)
{
this.target = target ;
+ this.handlerValid = handlerValid ;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
- associate() ;
- try
+ final String methodName = method.getName() ;
+ if ("hashCode".equals(methodName))
{
- return method.invoke(target, args);
+ return hashCode() ;
}
- catch (final InvocationTargetException ite)
+ else if ("equals".equals(methodName))
{
- throw ite.getCause() ;
+ return this.equals(args[0]) ;
}
+ else
+ {
+ if (handlerValid.get())
+ {
+ if (!"close".equals(methodName))
+ {
+ associate() ;
+ }
+ try
+ {
+ return method.invoke(target, args);
+ }
+ catch (final InvocationTargetException ite)
+ {
+ throw ite.getCause() ;
+ }
+ }
+ else
+ {
+ throw new JMSException("Instance is no longer valid") ;
+ }
+ }
}
}
}
Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java 2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java 2010-10-08 12:42:13 UTC (rev 35463)
@@ -24,6 +24,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -42,6 +43,7 @@
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.Context;
+import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.xa.XAResource;
@@ -136,7 +138,7 @@
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
- Assert.assertSame("Same session returned", session, session2) ;
+ Assert.assertSame("Same session returned", session.getSession(), session2.getSession()) ;
}
@Test
@@ -360,7 +362,7 @@
{
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
// transactional sessions are requested with transacted acknowledge mode
- final int acknowledgeMode = Session.SESSION_TRANSACTED ;
+ final int acknowledgeMode = JmsConnectionPool.XA_TRANSACTED ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
final JmsSession session = pool.getSession() ;
@@ -621,6 +623,98 @@
}
}
+ @Test
+ public void testXaSessionReuse()
+ throws Exception
+ {
+ final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+ final MockJTATransactionStrategy jtaTransactionStrategy = new MockJTATransactionStrategy() ;
+ TransactionStrategy.setTransactionStrategy(jtaTransactionStrategy) ;
+ try
+ {
+ final int acknowledgeMode = JmsConnectionPool.XA_TRANSACTED ;
+
+ final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+
+ Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+ jtaTransactionStrategy.begin() ;
+
+ final JmsSession firstSession = pool.getSession() ;
+
+ Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+ jtaTransactionStrategy.terminateTransaction(true) ;
+
+ // make sure it is still in use
+ Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+ // should throw exception, no transaction
+ try
+ {
+ firstSession.createProducer(null) ;
+ Assert.fail("exception expected, no active transaction") ;
+ }
+ catch (final JMSException jmse) {}
+
+ Assert.assertEquals("number of synchronizations", 0, jtaTransactionStrategy.numSynchronizations()) ;
+ Assert.assertEquals("number of resources", 0, jtaTransactionStrategy.numResources()) ;
+
+ jtaTransactionStrategy.begin() ;
+
+ // no exception as it should enlist
+ final MessageProducer firstProducer = firstSession.createProducer(null) ;
+
+ Assert.assertEquals("number of synchronizations", 1, jtaTransactionStrategy.numSynchronizations()) ;
+ Assert.assertEquals("number of resources", 1, jtaTransactionStrategy.numResources()) ;
+
+ final JmsSession secondSession = pool.getSession() ;
+
+ Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+ final MessageProducer secondProducer = secondSession.createProducer(null) ;
+
+ Assert.assertEquals("number of synchronizations", 1, jtaTransactionStrategy.numSynchronizations()) ;
+ Assert.assertEquals("number of resources", 1, jtaTransactionStrategy.numResources()) ;
+
+ pool.closeSession(secondSession) ;
+
+ Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+ secondProducer.close() ;
+
+ jtaTransactionStrategy.terminateTransaction(true) ;
+ // Shouldn't be back in the pool as there is still one reference open
+ Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+ // should throw exception, no transaction
+ try
+ {
+ firstProducer.getDestination() ;
+ Assert.fail("exception expected, no active transaction") ;
+ }
+ catch (final JMSException jmse) {}
+
+ // close should be okay
+ firstProducer.close() ;
+
+ pool.closeSession(firstSession) ;
+
+ Assert.assertEquals("current pool free count", 1, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+ Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+ }
+ finally
+ {
+ TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+ }
+ }
+
private Map<String, String> getPoolEnv()
{
final Map<String, String> env = new HashMap<String, String>() ;
@@ -704,6 +798,18 @@
return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {XASession.class},
new MockSessionInvocationHandler(Session.SESSION_TRANSACTED)) ;
}
+ else if ("toString".equals(methodName))
+ {
+ return toString() ;
+ }
+ else if ("hashCode".equals(methodName))
+ {
+ return hashCode() ;
+ }
+ else if ("equals".equals(methodName))
+ {
+ return equals(args[0]) ;
+ }
else
{
System.out.println("Connection method " + method.getName() + " called") ;
@@ -784,6 +890,18 @@
return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageProducer.class},
new MockNullInvocationHandler()) ;
}
+ else if ("toString".equals(methodName))
+ {
+ return toString() ;
+ }
+ else if ("hashCode".equals(methodName))
+ {
+ return hashCode() ;
+ }
+ else if ("equals".equals(methodName))
+ {
+ return equals(args[0]) ;
+ }
else
{
System.out.println("Session method " + method.getName() + " called") ;
@@ -859,6 +977,77 @@
}
}
+ private static final class MockJTATransactionStrategy extends NullTransactionStrategy
+ {
+ private Object tx ;
+ private ArrayList<XAResource> resources = new ArrayList<XAResource>() ;
+ private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>() ;
+
+ public void begin()
+ throws TransactionStrategyException
+ {
+ tx = new Object() ;
+ }
+
+ @Override
+ public boolean isActive()
+ throws TransactionStrategyException
+ {
+ return (tx != null) ;
+ }
+
+ @Override
+ public void registerSynchronization(final Synchronization sync)
+ throws TransactionStrategyException
+ {
+ synchronizations.add(sync) ;
+ }
+
+ @Override
+ public void enlistResource(final XAResource resource)
+ throws TransactionStrategyException
+ {
+ resources.add(resource) ;
+ }
+
+ @Override
+ public Object getTransaction()
+ throws TransactionStrategyException
+ {
+ return tx ;
+ }
+
+ public void terminateTransaction(final boolean commit)
+ {
+ final int status = (commit ? Status.STATUS_COMMITTED : Status.STATUS_ROLLEDBACK) ;
+
+ for(Synchronization sync: synchronizations)
+ {
+ sync.beforeCompletion() ;
+ }
+
+ tx = null ;
+ resources.clear() ;
+
+ for(Synchronization sync: synchronizations)
+ {
+ sync.afterCompletion(status) ;
+ }
+
+ synchronizations.clear();
+ }
+
+ public int numSynchronizations()
+ {
+ return synchronizations.size() ;
+ }
+
+ public int numResources()
+ {
+ return resources.size() ;
+ }
+ }
+
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(JmsConnectionPoolUnitTest.class);
Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java 2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java 2010-10-08 12:42:13 UTC (rev 35463)
@@ -83,7 +83,7 @@
// Now try another session. Should be same session instance as last time.
// Make sure it's returned to the pool on closing...
- assertTrue(session1 == session2);
+ assertSame("Underlying session", session1.getSession(), session2.getSession());
assertEquals(1, sessPools.size());
assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
assertEquals(1, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
More information about the jboss-svn-commits
mailing list