[jboss-svn-commits] JBL Code SVN: r26565 - in labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta: src/org/jboss/soa/esb/addressing/eprs and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri May 15 11:21:08 EDT 2009
Author: tfennelly
Date: 2009-05-15 11:21:08 -0400 (Fri, 15 May 2009)
New Revision: 26565
Added:
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java
Modified:
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
Log:
JmsConnectionPool changes to support multi-connections per pool with a max-sessions-per-connection config.
Modified: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-05-15 15:16:23 UTC (rev 26564)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -22,11 +22,7 @@
package org.jboss.internal.soa.esb.rosetta.pooling;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -58,9 +54,8 @@
import com.arjuna.common.util.propertyservice.PropertyManager;
/**
- * Interface that needs to be implemented to provide pool of connections.
- * @see DefaultConnectionPoolImpl
- * Default implementation of Connection Pool
+ * JmsConnectionPool.
+ *
* @author kstam
* @author <a href="mailto:daniel.bevenius at gmail.com">Daniel Bevenius</a>
* Date: March 10, 2007
@@ -83,36 +78,34 @@
private static final CompletionService<JmsSession> COMPLETION_SERVICE = new ExecutorCompletionService<JmsSession>(SESSION_EXECUTOR) ;
/** Maximum number of Sessions that will be created in this pool */
- private int MAX_SESSIONS = DEFAULT_POOL_SIZE; //TODO Make this manageable
-
- /** Time to sleep when trying to get a session. */
- private int SLEEP_TIME = DEFAULT_SLEEP;
-
- /** Number of free sessions in the pool that can be given out. Indexed by session key */
- private Map<Integer,ArrayList<JmsSession>> freeSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
-
- /** Number of session that are currently in use. Indexed by session key mode */
- private Map<Integer,ArrayList<JmsSession>> inUseSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
-
- /** Reference to a Queue or Topic Connection, we only need one per pool */
- protected Connection jmsConnection ;
-
- /** The Indentifier of the pool */
- private Map<String, String> poolKey;
+ private int maxSessions = DEFAULT_POOL_SIZE; //TODO Make this manageable
/**
- * Mapping from transactions to sessions.
+ * The max number of sessions per connection. Defaults to the Max total number of sessions ("maxSessions").
*/
- private Map<Object, JmsXASession> transactionsToSessions = new HashMap<Object, JmsXASession>() ;
+ private int maxSessionsPerConnection;
+
/**
- * Mapping from sessions to transactions.
+ * Maximum number of connection to be opened by this pool.
+ * <p/>
+ * Will be the truncated result of maxSessions / maxSessionsPerConnection.
*/
- private Map<JmsXASession, Object> sessionsToTransactions = new HashMap<JmsXASession, Object>() ;
+ private int maxConnections;
+ /** Time to sleep when trying to get a session. */
+ private int sleepTime = DEFAULT_SLEEP;
+
+ /** The Indentifier of the pool */
+ private Map<String, String> poolKey;
+
/** Logger */
private Logger logger = Logger.getLogger(this.getClass());
-
/**
+ * JMS Session Pools.
+ */
+ private List<JmsSessionPool> sessionPools = new ArrayList<JmsSessionPool>();
+
+ /**
* The flag representing XA aware connections.
*/
private boolean isXAAware ;
@@ -120,96 +113,71 @@
* Flag signifying that the pool has been terminated.
*/
private boolean terminated ;
-
/**
* The pool instance id.
*/
private long id ;
-
+
/**
* Contructor of the pool.
*
*/
- public JmsConnectionPool(Map<String, String> poolKey)
- {
+ public JmsConnectionPool(Map<String, String> poolKey) throws ConnectionException {
this(poolKey, JmsConnectionPool.CONFIGURED_POOL_SIZE, JmsConnectionPool.CONFIGURED_SLEEP);
}
- public JmsConnectionPool(Map<String, String> poolKey, int poolSize, int sleepTime)
- {
+ public JmsConnectionPool(Map<String, String> poolKey, int poolSize, int sleepTime) throws ConnectionException {
this.poolKey = poolKey;
- MAX_SESSIONS = poolSize;
- SLEEP_TIME = sleepTime;
-
- freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
- freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
- freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-
- inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
- inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
- inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
- }
-
- /**
- * This is where we create the sessions.
- *
- * @param poolKey
- * @param transacted
- * @throws JMSException
- */
- private synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
- throws JMSException
- {
- final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
- public JmsSession call()
- throws JMSException
- {
- final JmsSession session ;
- if (transacted) {
- session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
- } else {
- session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+ maxSessions = poolSize;
+ this.sleepTime = sleepTime;
+
+ String maxSessionsPerConnectionConfig = poolKey.get(JMSEpr.MAX_SESSIONS_PER_CONNECTION);
+ if(maxSessionsPerConnectionConfig != null) {
+ try {
+ maxSessionsPerConnection = Integer.parseInt(maxSessionsPerConnectionConfig.trim());
+ if(maxSessionsPerConnection < 1) {
+ throw new ConnectionException("Invalid '" + JMSEpr.MAX_SESSIONS_PER_CONNECTION + "' configuration value '" + maxSessionsPerConnection + "'. Must be greater than 0.");
}
- return session ;
+ } catch(NumberFormatException e) {
+ throw new ConnectionException("Invalid '" + JMSEpr.MAX_SESSIONS_PER_CONNECTION + "' configuration value '" + maxSessionsPerConnectionConfig.trim() + "'. Must be a valid Integer.");
}
- }) ;
-
- //Create a new Session
- ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
- // For now we only support JTA transacted sessions
- try
- {
- freeSessions.add(future.get());
+ } else {
+ maxSessionsPerConnection = maxSessions;
}
- catch (final InterruptedException ie) {} // ignore
- catch (final ExecutionException ee)
- {
- final Throwable th = ee.getCause() ;
- if (th instanceof JMSException)
- {
- throw (JMSException)th ;
- }
- if (th instanceof Error)
- {
- throw (Error)th ;
- }
- if (th instanceof RuntimeException)
- {
- throw (RuntimeException)th ;
- }
- }
- logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
+
+ maxConnections = (maxSessions/maxSessionsPerConnection);
}
+ protected int getMaxSessions() {
+ return maxSessions;
+ }
+
+ protected int getMaxSessionsPerConnection() {
+ return maxSessionsPerConnection;
+ }
+
+ protected int getMaxConnections() {
+ return maxConnections;
+ }
+
+ protected List<JmsSessionPool> getSessionPools() {
+ return sessionPools;
+ }
+
/**
* This method can be called whenever a connection is needed from the pool.
- *
+ *
* @return Connection to be used
* @throws ConnectionException
*/
public synchronized JmsSession getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
{
+ if (terminated)
+ {
+ throw new ConnectionException("Connection pool has been terminated") ;
+ }
+
try
{
return internalGetSession(acknowledgeMode) ;
@@ -234,22 +202,22 @@
throw jmse ;
}
}
-
+
private synchronized JmsSession internalGetSession(final int acknowledgeMode)
throws NamingException, JMSException, ConnectionException
{
- try {
- initConnection() ;
- } catch (final NamingContextException nce) {
- throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
+ if(sessionPools.isEmpty()) {
+ // Create the first pool entry...
+ addSessionPool();
}
+
final boolean transacted ;
try {
transacted = (isXAAware && TransactionStrategy.getTransactionStrategy(true).isActive()) ;
} catch (final TransactionStrategyException tse) {
throw new ConnectionException("Failed to determine current transaction context", tse) ;
}
-
+
if (transacted)
{
final JmsXASession currentSession = getXASession() ;
@@ -258,44 +226,113 @@
return currentSession ;
}
}
+
final int mode = (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode) ;
-
- final long end = System.currentTimeMillis() + (SLEEP_TIME * 1000) ;
+ final long end = System.currentTimeMillis() + (sleepTime * 1000) ;
boolean emitExpiry = logger.isDebugEnabled() ;
- for(;;) {
- ArrayList<JmsSession> freeSessions = freeSessionsMap.get(mode );
- ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
- if (freeSessions.size() > 0)
+
+ while(true) {
+ // Cycle through all the existing session pools and try getting a
+ // free session. Will JmsSessionPool.getSession will add a session
+ // to a pool that is not "full"...
+ for (JmsSessionPool sessionPool : sessionPools) {
+ JmsSession session = sessionPool.getSession(mode, transacted);
+
+ if(session != null) {
+ return session;
+ }
+ }
+
+ // OK... all the existing session pools are full and have no free sessions. If we can add
+ // another session pool, add it and then start this loop again...
+ if(sessionPools.size() < maxConnections) {
+ addSessionPool();
+ continue;
+ }
+
+ // We've reached our max permitted number of connections and the sessions
+ // associated with these connections are all in use. Drop into the
+ // delay below and wait for a session to be freed...
+ if (emitExpiry)
{
- final JmsSession session = freeSessions.remove(freeSessions.size()-1);
- inUseSessions.add(session);
- return session ;
- } else if (inUseSessions.size()<MAX_SESSIONS) {
- addAnotherSession(poolKey, transacted, mode);
- continue ;
- } else {
- if (emitExpiry)
+ logger.debug("The connection pool was exhausted, waiting for a session to be released.") ;
+ emitExpiry = false ;
+ }
+
+ // Wait and loop again...
+ final long now = System.currentTimeMillis() ;
+ final long delay = (end - now) ;
+ if (delay <= 0)
+ {
+ throw new ConnectionException("Could not obtain a JMS connection from the pool after "+ sleepTime +"s.");
+ }
+ else
+ {
+ try
{
- logger.debug("The connection pool was exhausted, waiting for a session to be released.") ;
- emitExpiry = false ;
+ wait(delay) ;
}
- final long now = System.currentTimeMillis() ;
- final long delay = (end - now) ;
- if (delay <= 0)
+ catch (final InterruptedException ie) {} // ignore
+ }
+ }
+ }
+
+ private JmsSessionPool addSessionPool() throws NamingException, JMSException, ConnectionException {
+ try {
+ JmsSessionPool sessionPool = new JmsSessionPool();
+
+ logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
+ Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
+ Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
+ try {
+ String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
+ Object factoryConnection=null;
+
+ try
{
- throw new ConnectionException("Could not obtain a JMS connection from the pool after "+SLEEP_TIME+"s.");
+ factoryConnection = jndiContext.lookup(connectionFactoryString);
+ } catch (NamingException ne) {
+ logger.info("Received NamingException, refreshing context.");
+ jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
+ factoryConnection = jndiContext.lookup(connectionFactoryString);
}
- else
- {
- try
+ final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
+ final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
+ boolean useJMSSecurity = (username != null && password != null);
+ logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
+ if (factoryConnection instanceof XAConnectionFactory) {
+ final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
+ sessionPool.jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
+ isXAAware = true ;
+ sessionPool.freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+ sessionPool.inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+ } else if (factoryConnection instanceof ConnectionFactory) {
+ final ConnectionFactory factory = (ConnectionFactory)factoryConnection ;
+ sessionPool.jmsConnection = useJMSSecurity ? factory.createConnection(username,password): factory.createConnection();
+ }
+
+ sessionPool.jmsConnection.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException arg0)
{
- wait(delay) ;
+ // This will result in all connections (and their sessions) on this pool
+ // being closed...
+ cleanSessionPool() ;
}
- catch (final InterruptedException ie) {} // ignore
- }
+ }) ;
+ sessionPool.jmsConnection.start();
+
+ // And add it to the pool...
+ sessionPools.add(sessionPool);
+
+ return sessionPool;
+ } finally {
+ NamingContextPool.releaseNamingContext(jndiContext) ;
}
+ } catch (final NamingContextException nce) {
+ throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
}
}
+
/**
* This method can be called whenever a Session is needed from the pool.
* @return
@@ -337,36 +374,11 @@
*/
synchronized void handleCloseSession(final JmsSession session)
{
- if (session.isSuspect())
- {
- logger.debug("Session is suspect, dropping") ;
- handleReleaseSession(session) ;
+ JmsSessionPool sessionPool = findOwnerPool(session);
+
+ if(sessionPool != null) {
+ sessionPool.handleCloseSession(session);
}
- else
- {
- if (session.getId() != id)
- {
- logger.debug("Session is from a previous incarnation, dropping") ;
- }
- else
- {
- final int mode ;
- try {
- mode = session.getAcknowledgeMode() ;
- } catch (final JMSException jmse) {
- logger.warn("JMSException while calling getAcknowledgeMode") ;
- logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
- return ;
- }
-
- final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
- if (sessions != null) {
- sessions.add(session) ;
- }
- }
- session.releaseResources() ;
- releaseInUseSession(session) ;
- }
}
/**
@@ -390,19 +402,12 @@
*/
private void releaseInUseSession(final JmsSession session)
{
- final int mode ;
- try {
- mode = session.getAcknowledgeMode() ;
- } catch (final JMSException jmse) {
- logger.warn("JMSException while calling getAcknowledgeMode") ;
- logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
- return ;
+ JmsSessionPool sessionPool = findOwnerPool(session);
+
+ if(sessionPool != null) {
+ sessionPool.releaseInUseSession(session);
}
-
- final ArrayList<JmsSession> sessions = (inUseSessionsMap == null ? null : inUseSessionsMap.get(mode));
- if (sessions != null) {
- sessions.remove(session) ;
- }
+
notifyAll() ;
}
@@ -435,34 +440,14 @@
*/
private void cleanSessionPool()
{
- final Connection connection ;
- synchronized(this)
- {
- if (terminated)
- {
- return ;
+ for(JmsSessionPool sessionPool : sessionPools) {
+ try {
+ sessionPool.cleanSessionPool();
+ } catch(Exception e) {
+ logger.error("Exception while cleaning JmsSessionPool.", e);
}
- id++ ;
- for (List<JmsSession> list : freeSessionsMap.values())
- {
- list.clear() ;
- }
- for (List<JmsSession> list : inUseSessionsMap.values())
- {
- list.clear() ;
- }
- transactionsToSessions.clear() ;
- sessionsToTransactions.clear() ;
-
- logger.debug("Cleared the session pool now closing the connection to the factory.");
- connection = jmsConnection ;
- jmsConnection = null ;
}
- if (connection!=null) {
- try {
- connection.close();
- } catch (final Exception ex) {} // ignore
- }
+ sessionPools.clear();
}
/**
@@ -471,19 +456,16 @@
*/
public synchronized void removeSessionPool()
{
- freeSessionsMap = null ;
- inUseSessionsMap = null ;
- transactionsToSessions = null ;
- sessionsToTransactions = null ;
-
- logger.debug("Emptied the session pool now closing the connection to the factory.");
- if (jmsConnection!=null) {
+ for(JmsSessionPool sessionPool : sessionPools) {
try {
- jmsConnection.close();
- } catch (final Exception ex) {} // ignore
- jmsConnection=null;
- terminated = true ;
+ sessionPool.removeSessionPool();
+ } catch(Exception e) {
+ logger.error("Exception while removing JmsSessionPool.", e);
+ }
}
+
+ sessionPools.clear();
+ terminated = true ;
JmsConnectionPoolContainer.removePool(poolKey);
}
/**
@@ -514,9 +496,13 @@
* @return int the number of in use sessions
*/
public synchronized int getFreeSessionsInPool(final int acknowledgeMode) {
- final ArrayList<JmsSession> freeSessionMap = (freeSessionsMap == null ? null : freeSessionsMap.get(acknowledgeMode)) ;
- final int numFreeSessions = (freeSessionMap == null ? 0 : freeSessionMap.size()) ;
- return numFreeSessions;
+ int count = 0;
+
+ for(JmsSessionPool sessionPool : sessionPools) {
+ count += sessionPool.getFreeSessionsInPool(acknowledgeMode);
+ }
+
+ return count;
}
/**
@@ -526,69 +512,13 @@
* @return int the number of in use sessions
*/
public synchronized int getInUseSessionsInPool(final int acknowledgeMode) {
- final ArrayList<JmsSession> inUseSessionMap = (inUseSessionsMap == null ? null : inUseSessionsMap.get(acknowledgeMode)) ;
- final int numInUseSessions = (inUseSessionMap == null ? 0 : inUseSessionMap.size()) ;
- return numInUseSessions;
- }
-
- /**
- * Initialise the connection.
- * @throws ConnectionException If the pool has already been terminated.
- * @throws NamingContextException for errors obtaining a naming context
- * @throws NamingException for errors accessing a naming context
- * @throws JMSException for errors creating the connection
- */
- private synchronized void initConnection()
- throws ConnectionException, NamingContextException, NamingException, JMSException
- {
- if (terminated)
- {
- throw new ConnectionException("Connection pool has been terminated") ;
+ int count = 0;
+
+ for(JmsSessionPool sessionPool : sessionPools) {
+ count += sessionPool.getInUseSessionsInPool(acknowledgeMode);
}
-
- if (jmsConnection==null) {
- JmsConnectionPoolContainer.addToPool(poolKey, this);
- logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
- Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
- Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
- try {
- String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
- Object factoryConnection=null;
- try
- {
- factoryConnection = jndiContext.lookup(connectionFactoryString);
- } catch (NamingException ne) {
- logger.info("Received NamingException, refreshing context.");
- jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
- factoryConnection = jndiContext.lookup(connectionFactoryString);
- }
- final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
- final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
- boolean useJMSSecurity = (username != null && password != null);
- logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
- if (factoryConnection instanceof XAConnectionFactory) {
- final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
- jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
- isXAAware = true ;
- freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
- inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
- } else if (factoryConnection instanceof ConnectionFactory) {
- final ConnectionFactory factory = (ConnectionFactory)factoryConnection ;
- jmsConnection = useJMSSecurity ? factory.createConnection(username,password): factory.createConnection();
- }
-
- jmsConnection.setExceptionListener(new ExceptionListener() {
- public void onException(JMSException arg0)
- {
- cleanSessionPool() ;
- }
- }) ;
- jmsConnection.start();
- } finally {
- NamingContextPool.releaseNamingContext(jndiContext) ;
- }
- }
+ return count;
}
/**
@@ -615,7 +545,15 @@
throws ConnectionException
{
final Object tx = getTransaction() ;
- return transactionsToSessions.get(tx) ;
+
+ for(JmsSessionPool sessionPool : sessionPools) {
+ JmsXASession session = sessionPool.transactionsToSessions.get(tx);
+ if(session != null) {
+ return session;
+ }
+ }
+
+ return null;
}
/**
@@ -626,25 +564,36 @@
synchronized void associateTransaction(final JmsXASession session)
throws ConnectionException
{
- final Object tx = getTransaction() ;
- if (tx == null)
- {
- throw new ConnectionException("No active transaction") ;
+ JmsSessionPool sessionPool = findOwnerPool(session);
+
+ if(sessionPool != null) {
+ sessionPool.associateTransaction(session);
}
- transactionsToSessions.put(tx, session) ;
- sessionsToTransactions.put(session, tx) ;
}
-
+
/**
* Disassociate the JMS XA Session from a transaction.
* @param session The XA session.
*/
synchronized void disassociateTransaction(final JmsXASession session)
{
- final Object tx = sessionsToTransactions.remove(session) ;
- transactionsToSessions.remove(tx) ;
+ JmsSessionPool sessionPool = findOwnerPool(session);
+
+ if(sessionPool != null) {
+ sessionPool.disassociateTransaction(session);
+ }
}
-
+
+ synchronized JmsSessionPool findOwnerPool(final JmsSession session) {
+ for(JmsSessionPool sessionPool : sessionPools) {
+ if(sessionPool.isOwnerPool(session)) {
+ return sessionPool;
+ }
+ }
+
+ return null;
+ }
+
static
{
PropertyManager prop = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE);
@@ -676,9 +625,322 @@
}
}
}
-
+
+ class JmsSessionPool {
+
+ /** Reference to a Queue or Topic Connection, we only need one per pool */
+ protected Connection jmsConnection ;
+
+ /** Number of free sessions in the pool that can be given out. Indexed by session key */
+ private Map<Integer,ArrayList<JmsSession>> freeSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
+
+ /** Number of session that are currently in use. Indexed by session key mode */
+ private Map<Integer,ArrayList<JmsSession>> inUseSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
+
+ /**
+ * Mapping from transactions to sessions.
+ */
+ private Map<Object, JmsXASession> transactionsToSessions = new HashMap<Object, JmsXASession>() ;
+
+ /**
+ * Mapping from sessions to transactions.
+ */
+ private Map<JmsXASession, Object> sessionsToTransactions = new HashMap<JmsXASession, Object>() ;
+
+ private JmsSessionPool() {
+ freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+
+ inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+ }
+
+ private boolean isOwnerPool(JmsSession session) {
+ if (isInList(session, freeSessionsMap.values())) {
+ return true;
+ } else if (isInList(session, inUseSessionsMap.values())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean isInList(JmsSession session, Collection<ArrayList<JmsSession>> sessionLists) {
+ for(ArrayList<JmsSession> sessionList : sessionLists) {
+ if(sessionList.contains(session)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public synchronized void removeSessionPool() {
+ freeSessionsMap = null ;
+ inUseSessionsMap = null ;
+ transactionsToSessions = null ;
+ sessionsToTransactions = null ;
+
+ logger.debug("Emptied the session pool now closing the connection to the factory.");
+ if (jmsConnection!=null) {
+ try {
+ jmsConnection.close();
+ } catch (final Exception ex) {} // ignore
+ jmsConnection=null;
+ }
+ }
+
+ public synchronized JmsSession getSession(int acknowledgeMode, boolean transacted) throws JMSException {
+
+ ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode );
+ ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(acknowledgeMode);
+
+ if (freeSessions.size() > 0)
+ {
+ final JmsSession session = freeSessions.remove(freeSessions.size()-1);
+ inUseSessions.add(session);
+ return session ;
+ } else if (getSessionsInPool() < maxSessionsPerConnection) {
+ final JmsSession session = addAnotherSession(poolKey, transacted, acknowledgeMode);
+ if(session != null) {
+ inUseSessions.add(session);
+ return session ;
+ }
+ }
+
+ return null;
+ }
+
+
+ /**
+ * This is where we create the sessions.
+ *
+ * @param poolKey
+ * @param transacted
+ * @throws JMSException
+ */
+ private synchronized JmsSession addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
+ throws JMSException
+ {
+ // Sessions need to be created in this way because of an issue with JBM.
+ // See https://jira.jboss.org/jira/browse/JBESB-1799
+ final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
+ public JmsSession call()
+ throws JMSException
+ {
+ final JmsSession session ;
+
+ if (transacted) {
+ session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+ } else {
+ session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+ }
+
+ return session ;
+ }
+ }) ;
+
+ //Create a new Session
+ ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
+ // For now we only support JTA transacted sessions
+ try
+ {
+ JmsSession session = future.get();
+ logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
+ return session;
+ }
+ catch (final InterruptedException ie) {} // ignore
+ catch (final ExecutionException ee)
+ {
+ final Throwable th = ee.getCause() ;
+ if (th instanceof JMSException)
+ {
+ throw (JMSException)th ;
+ }
+ if (th instanceof Error)
+ {
+ throw (Error)th ;
+ }
+ if (th instanceof RuntimeException)
+ {
+ throw (RuntimeException)th ;
+ }
+ }
+
+ return null;
+ }
+
+ public void cleanSessionPool() {
+ final Connection connection ;
+ synchronized(this)
+ {
+ if (terminated)
+ {
+ return ;
+ }
+ id++ ;
+ for (List<JmsSession> list : freeSessionsMap.values())
+ {
+ list.clear() ;
+ }
+ for (List<JmsSession> list : inUseSessionsMap.values())
+ {
+ list.clear() ;
+ }
+ transactionsToSessions.clear() ;
+ sessionsToTransactions.clear() ;
+
+ logger.debug("Cleared the session pool now closing the connection to the factory.");
+ connection = jmsConnection ;
+ jmsConnection = null ;
+ }
+ if (connection!=null) {
+ try {
+ connection.close();
+ } catch (final Exception ex) {} // ignore
+ }
+ }
+
+ /**
+ * Returns the total number of sessions in the pool.
+ * @return The total number of sessions in the pool.
+ */
+ private synchronized int getSessionsInPool() {
+ int total = 0;
+
+ total += getSessionsInMap(freeSessionsMap);
+ total += getSessionsInMap(inUseSessionsMap);
+
+ return total;
+ }
+
+ private synchronized int getSessionsInMap(Map<Integer,ArrayList<JmsSession>> sessionsMap) {
+ Collection<ArrayList<JmsSession>> sessionLists = sessionsMap.values();
+ int total = 0;
+
+ for(ArrayList<JmsSession> sessionList : sessionLists) {
+ total += sessionList.size();
+ }
+
+ return total;
+ }
+
+ /**
+ * Returns the total nr of sessions for the specifed acknowledge mode
+ *
+ * @param acknowledgeMode the acknowledge mode of sessions
+ * @return
+ */
+ public synchronized int getSessionsInPool(final int acknowledgeMode) {
+ return getFreeSessionsInPool(acknowledgeMode) + getInUseSessionsInPool(acknowledgeMode) ;
+ }
+
+ /**
+ * Get the number of free sessions created with the specified acknowledge mode
+ * @param acknowledgeMode the acknowledge mode of sessions
+ * @return int the number of in use sessions
+ */
+ public synchronized int getFreeSessionsInPool(final int acknowledgeMode) {
+ final ArrayList<JmsSession> freeSessionMap = (freeSessionsMap == null ? null : freeSessionsMap.get(acknowledgeMode)) ;
+ final int numFreeSessions = (freeSessionMap == null ? 0 : freeSessionMap.size()) ;
+ return numFreeSessions;
+ }
+
+ /**
+ * Get the number of sessions that are in use and that were
+ * created with the specified acknowledge mode
+ * @param acknowledgeMode the acknowledge mode of sessions
+ * @return int the number of in use sessions
+ */
+ public synchronized int getInUseSessionsInPool(final int acknowledgeMode) {
+ final ArrayList<JmsSession> inUseSessionMap = (inUseSessionsMap == null ? null : inUseSessionsMap.get(acknowledgeMode)) ;
+ final int numInUseSessions = (inUseSessionMap == null ? 0 : inUseSessionMap.size()) ;
+ return numInUseSessions;
+ }
+
+ /**
+ * Associate the JMS XA Session with the current transaction.
+ * @param session The XA session.
+ * @throws ConnectionException if there is no transaction active.
+ */
+ synchronized void associateTransaction(final JmsXASession session)
+ throws ConnectionException
+ {
+ final Object tx = getTransaction() ;
+ if (tx == null)
+ {
+ throw new ConnectionException("No active transaction") ;
+ }
+ transactionsToSessions.put(tx, session) ;
+ sessionsToTransactions.put(session, tx) ;
+ }
+
+ /**
+ * Disassociate the JMS XA Session from a transaction.
+ * @param session The XA session.
+ */
+ synchronized void disassociateTransaction(final JmsXASession session)
+ {
+ final Object tx = sessionsToTransactions.remove(session) ;
+ transactionsToSessions.remove(tx) ;
+ }
+
+ public void releaseInUseSession(JmsSession session) {
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (inUseSessionsMap == null ? null : inUseSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.remove(session) ;
+ }
+ }
+
+ public void handleCloseSession(JmsSession session) {
+ if (session.isSuspect())
+ {
+ logger.debug("Session is suspect, dropping") ;
+ handleReleaseSession(session) ;
+ }
+ else
+ {
+ if (session.getId() != id)
+ {
+ logger.debug("Session is from a previous incarnation, dropping") ;
+ }
+ else
+ {
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.add(session) ;
+ }
+ }
+ session.releaseResources() ;
+ releaseInUseSession(session) ;
+ }
+ }
+ }
+
/**
* Thread factory returning daemon threads.
+ * <p/>
+ * Required as part of the fix for https://jira.jboss.org/jira/browse/JBESB-1799
+ *
* @author kevin
*/
private static final class DaemonThreadFactory implements ThreadFactory
@@ -687,7 +949,7 @@
* The default executor factory.
*/
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory() ;
-
+
/**
* Return a new daemon thread.
* @param runnable The runnable associated with the thread.
Modified: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java 2009-05-15 15:16:23 UTC (rev 26564)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -75,9 +75,11 @@
public static final String DESTINATION_NAME_TAG = "destination-name";
public static final String CONNECTION_FACTORY_TAG = "connection-factory";
-
- public static final String JNDI_PKG_PREFIX_TAG = "jndi-pkg-prefix";
+ public static final String MAX_SESSIONS_PER_CONNECTION = "max-sessions-per-connection";
+
+ public static final String JNDI_PKG_PREFIX_TAG = "jndi-pkg-prefix";
+
public static final String JNDI_URL_TAG = "jndi-URL";
public static final String JNDI_CONTEXT_FACTORY_TAG = "jndi-context-factory";
Copied: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java (from rev 26274, labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JBossActivationMapper.java)
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,104 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.soa.esb.listeners.jca;
+
+import java.util.Map;
+
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.jboss.soa.esb.ConfigurationException;
+
+
+/**
+ * Activation mapper implementation for IBM MQ (aka WMQ).
+ *
+ * @author <a href="kevin.conner at jboss.com">Kevin Conner</a>
+ */
+public class WMQActivationMapper implements ActivationMapper
+{
+ /**
+ * Initialise the destination name in the activation configuration.
+ * @param activationConfig The current activation configuration.
+ * @param name The destination name.
+ * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+ */
+ public void setDestination(final Map<String, String> activationConfig, final String name)
+ throws ConfigurationException
+ {
+ activationConfig.put("destination", name) ;
+ }
+
+ /**
+ * Initialise the destination name in the activation configuration.
+ * @param activationConfig The current activation configuration.
+ * @param providerAdapterJNDI The provider adapter JNDI value or null is not specified.
+ * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+ */
+ public void setProviderAdapterJNDI(final Map<String, String> activationConfig, final String providerAdapterJNDI)
+ throws ConfigurationException
+ {
+ if (providerAdapterJNDI != null)
+ {
+ activationConfig.put("providerAdapterJNDI", providerAdapterJNDI) ;
+ }
+ }
+
+ /**
+ * Initialise the destination name in the activation configuration.
+ * @param activationConfig The current activation configuration.
+ * @param queue True if specifying a JMS Queue, false is specifying a JMS Topic.
+ * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+ */
+ public void setDestinationType(final Map<String, String> activationConfig, final boolean queue)
+ throws ConfigurationException
+ {
+ activationConfig.put("destinationType", queue ? Queue.class.getName() :Topic.class.getName()) ;
+ }
+
+ /**
+ * Initialise the destination name in the activation configuration.
+ * @param activationConfig The current activation configuration.
+ * @param messageSelector The message selector or null if not specified.
+ * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+ */
+ public void setMessageSelector(final Map<String, String> activationConfig, final String messageSelector)
+ throws ConfigurationException
+ {
+ if (messageSelector != null)
+ {
+ activationConfig.put("messageSelector", messageSelector) ;
+ }
+ }
+
+ /**
+ * Initialise the destination name in the activation configuration.
+ * @param activationConfig The current activation configuration.
+ * @param maxThreads The maximum thread value or null if not specified.
+ * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+ */
+ public void setMaxThreads(final Map<String, String> activationConfig, final Integer maxThreads)
+ throws ConfigurationException
+ {
+ // Not applicable to WMQ!!
+ }
+}
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import junit.framework.TestCase;
+
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.internal.soa.esb.rosetta.pooling.jms.MockJMSConnectionFactory;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MaxSessionsPerConnectionUnitTest extends TestCase {
+
+ private Map<String, String> jndiEnv = new HashMap<String, String>();
+
+ public MaxSessionsPerConnectionUnitTest() {
+ jndiEnv.put(Context.INITIAL_CONTEXT_FACTORY, MockInitialContextFactory.class.getName());
+ jndiEnv.put(JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ jndiEnv.put(JMSEpr.MAX_SESSIONS_PER_CONNECTION, "2");
+ }
+
+ public void test() throws NamingException, JMSException, ConnectionException {
+ MockJndiContextHandler.objects.put("ConnectionFactory", new MockJMSConnectionFactory(2));
+ JmsConnectionPool connPool = new JmsConnectionPool(jndiEnv);
+ List<JmsConnectionPool.JmsSessionPool> sessPools = connPool.getSessionPools();
+
+ assertEquals(20, connPool.getMaxSessions());
+ assertEquals(2, connPool.getMaxSessionsPerConnection());
+ assertEquals(10, connPool.getMaxConnections());
+ assertEquals(0, sessPools.size());
+
+ try {
+ JmsSession session1 = connPool.getSession();
+
+ // Just get 1 session. Make sure it's returned to the
+ // pool on closing...
+ assertEquals(1, sessPools.size());
+ assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(1, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ connPool.handleCloseSession(session1);
+ assertEquals(1, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(0, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+
+ JmsSession session2 = connPool.getSession();
+
+ // Now try another session. Should be same session instance as last time.
+ // Make sure it's returned to the pool on closing...
+ assertTrue(session1 == session2);
+ assertEquals(1, sessPools.size());
+ assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(1, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ connPool.handleCloseSession(session2);
+ assertEquals(1, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(0, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+
+ // Now try 3 sessions. Should cause another pool to be created.
+ // Make sure it's returned to the pool on closing...
+ session1 = connPool.getSession();
+ session2 = connPool.getSession();
+ JmsSession session3 = connPool.getSession();
+
+ assertEquals(2, sessPools.size());
+ assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(2, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(0, sessPools.get(1).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(1, sessPools.get(1).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ connPool.handleCloseSession(session1);
+ connPool.handleCloseSession(session2);
+ connPool.handleCloseSession(session3);
+ assertEquals(2, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(0, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(1, sessPools.get(1).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(0, sessPools.get(1).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+
+ List<JmsSession> sessions = getSessions(connPool, 20);
+ assertEquals(10, sessPools.size());
+ assertAllInUse(sessPools);
+ closeAll(sessions, connPool);
+ assertAllFree(sessPools);
+ } finally {
+ connPool.removeSessionPool();
+ }
+ }
+
+ private void closeAll(List<JmsSession> sessions, JmsConnectionPool connPool) {
+ for(JmsSession session : sessions) {
+ connPool.handleCloseSession(session);
+ }
+ }
+
+ private void assertAllInUse(List<JmsConnectionPool.JmsSessionPool> sessPools) {
+ for(JmsConnectionPool.JmsSessionPool sessPool : sessPools) {
+ assertEquals(0, sessPool.getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(2, sessPool.getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ }
+ }
+
+ private void assertAllFree(List<JmsConnectionPool.JmsSessionPool> sessPools) {
+ for(JmsConnectionPool.JmsSessionPool sessPool : sessPools) {
+ assertEquals(2, sessPool.getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ assertEquals(0, sessPool.getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+ }
+ }
+
+ private List<JmsSession> getSessions(JmsConnectionPool connPool, int numSessions) throws NamingException, JMSException, ConnectionException {
+ List<JmsSession> sessions = new ArrayList<JmsSession>();
+ for(int i = 0; i < numSessions; i++) {
+ sessions.add(connPool.getSession());
+ }
+ return sessions;
+ }
+}
Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.naming.spi.InitialContextFactory;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import java.util.Hashtable;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockInitialContextFactory implements InitialContextFactory {
+
+ public Context getInitialContext(Hashtable<?, ?> environment) throws NamingException {
+ return MockJndiContextHandler.getInstance();
+ }
+}
Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.naming.Context;
+import javax.naming.NameNotFoundException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockJndiContextHandler implements InvocationHandler {
+
+ public static Map<String, Object> objects = new HashMap<String, Object>();
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+ if(method.getName().equals("lookup")) {
+ String name = (String) args[0];
+ Object object = objects.get(name);
+
+ if(object == null) {
+ throw new NameNotFoundException("Obect '" + name + "' not found.");
+ }
+
+ return object;
+ }
+
+ return null;
+ }
+
+ public static Context getInstance() {
+ return (Context) Proxy.newProxyInstance(Context.class.getClassLoader(),
+ new Class[]{Context.class},
+ new MockJndiContextHandler());
+ }
+}
Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java 2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling.jms;
+
+import org.jboss.internal.soa.esb.rosetta.pooling.MockJndiContextHandler;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockJMSConnectionFactory implements ConnectionFactory {
+
+ private int maxSessionsPerConnection;
+
+ public MockJMSConnectionFactory(int maxSessionsPerConnection) {
+ this.maxSessionsPerConnection = maxSessionsPerConnection;
+ }
+
+ public Connection createConnection() throws JMSException {
+ return createConnectionHandler();
+ }
+
+ public Connection createConnection(String s, String s1) throws JMSException {
+ return createConnectionHandler();
+ }
+
+ private Connection createConnectionHandler() {
+ return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
+ new Class[]{Connection.class},
+ new MockJMSConnectionHandler(maxSessionsPerConnection));
+ }
+
+ private class MockJMSConnectionHandler implements InvocationHandler {
+
+ private int maxSessionsPerConnection;
+ private List<Session> sessions = new ArrayList<Session>();
+
+ public MockJMSConnectionHandler(int maxSessionsPerConnection) {
+ this.maxSessionsPerConnection = maxSessionsPerConnection;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+ if(method.getName().equals("createSession")) {
+ System.out.println("Creating JMS Session");
+
+ if(maxSessionsPerConnection > 0 && sessions.size() == maxSessionsPerConnection) {
+ throw new JMSException("Unable to create JMS Session on Connection. Maximum of " + maxSessionsPerConnection + " Sessions/Connection.");
+ }
+
+ Session session = (Session) Proxy.newProxyInstance(Session.class.getClassLoader(),
+ new Class[]{Session.class},
+ new MockJMSSessionHandler());
+
+ sessions.add(session);
+
+ return session;
+ }
+
+ System.out.println("MockJMSConnectionHandler: Call to " + method.getName());
+
+ return null;
+ }
+ }
+
+ private class MockJMSSessionHandler implements InvocationHandler {
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+ if(method.getName().equals("getAcknowledgeMode")) {
+ return Session.AUTO_ACKNOWLEDGE;
+ }
+
+ System.out.println("MockJMSSessionHandler: Call to " + method.getName());
+
+ return null;
+ }
+ }
+}
More information about the jboss-svn-commits
mailing list