[jboss-svn-commits] JBL Code SVN: r26565 - in labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta: src/org/jboss/soa/esb/addressing/eprs and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri May 15 11:21:08 EDT 2009


Author: tfennelly
Date: 2009-05-15 11:21:08 -0400 (Fri, 15 May 2009)
New Revision: 26565

Added:
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java
Modified:
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
Log:
JmsConnectionPool changes to support multi-connections per pool with a max-sessions-per-connection config.

Modified: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2009-05-15 15:16:23 UTC (rev 26564)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -22,11 +22,7 @@
 package org.jboss.internal.soa.esb.rosetta.pooling;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -58,9 +54,8 @@
 import com.arjuna.common.util.propertyservice.PropertyManager;
 
 /**
- * Interface that needs to be implemented to provide pool of connections.
- * @see DefaultConnectionPoolImpl
- * Default implementation of Connection Pool
+ * JmsConnectionPool.
+ *
  * @author kstam
  * @author <a href="mailto:daniel.bevenius at gmail.com">Daniel Bevenius</a>				
  * Date: March 10, 2007
@@ -83,36 +78,34 @@
 	private static final CompletionService<JmsSession> COMPLETION_SERVICE = new ExecutorCompletionService<JmsSession>(SESSION_EXECUTOR) ;
 	
     /** Maximum number of Sessions that will be created in this pool */
-    private int MAX_SESSIONS = DEFAULT_POOL_SIZE;    //TODO Make this manageable
-    
-    /** Time to sleep when trying to get a session. */
-    private int SLEEP_TIME = DEFAULT_SLEEP;
-    
-    /** Number of free sessions in the pool that can be given out. Indexed by session key */
-    private Map<Integer,ArrayList<JmsSession>> freeSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
-    
-    /** Number of session that are currently in use. Indexed by session key mode */
-    private Map<Integer,ArrayList<JmsSession>> inUseSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
-    
-    /** Reference to a Queue or Topic Connection, we only need one per pool */
-    protected Connection jmsConnection ;
-    
-    /** The Indentifier of the pool */
-    private Map<String, String> poolKey;
+    private int maxSessions = DEFAULT_POOL_SIZE;    //TODO Make this manageable
 
     /**
-     * Mapping from transactions to sessions.
+     * The max number of sessions per connection.  Defaults to the Max total number of sessions ("maxSessions").
      */
-    private Map<Object, JmsXASession> transactionsToSessions = new HashMap<Object, JmsXASession>() ;
+    private int maxSessionsPerConnection;
+
     /**
-     * Mapping from sessions to transactions.
+     * Maximum number of connection to be opened by this pool.
+     * <p/>
+     * Will be the truncated result of maxSessions / maxSessionsPerConnection.
      */
-    private Map<JmsXASession, Object> sessionsToTransactions = new HashMap<JmsXASession, Object>() ;
+    private int maxConnections;
 
+    /** Time to sleep when trying to get a session. */
+    private int sleepTime = DEFAULT_SLEEP;
+
+    /** The Indentifier of the pool */
+    private Map<String, String> poolKey;
+
     /** Logger */
     private Logger logger = Logger.getLogger(this.getClass());
-    
     /**
+     * JMS Session Pools.
+     */
+    private List<JmsSessionPool> sessionPools = new ArrayList<JmsSessionPool>();
+
+    /**
      * The flag representing XA aware connections.
      */
     private boolean isXAAware ;
@@ -120,96 +113,71 @@
      * Flag signifying that the pool has been terminated.
      */
     private boolean terminated ;
-    
     /**
      * The pool instance id.
      */
     private long id ;
-    
+
     /**
      * Contructor of the pool.
      * 
      */
-    public JmsConnectionPool(Map<String, String> poolKey) 
-    {
+    public JmsConnectionPool(Map<String, String> poolKey) throws ConnectionException {
     	this(poolKey, JmsConnectionPool.CONFIGURED_POOL_SIZE, JmsConnectionPool.CONFIGURED_SLEEP);
     }
     
-    public JmsConnectionPool(Map<String, String> poolKey, int poolSize, int sleepTime) 
-    {
+    public JmsConnectionPool(Map<String, String> poolKey, int poolSize, int sleepTime) throws ConnectionException {
         this.poolKey = poolKey;
         
-        MAX_SESSIONS = poolSize;
-        SLEEP_TIME = sleepTime;
-        
-        freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-        freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-        freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-        
-        inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-        inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-        inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
-    }
-   
-    /**
-     * This is where we create the sessions. 
-     * 
-     * @param poolKey
-     * @param transacted
-     * @throws JMSException
-     */
-    private  synchronized void addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
-        throws JMSException
-    {
-        final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
-            public JmsSession call()
-                throws JMSException
-            {
-                final JmsSession session ;
-                if (transacted) {
-                    session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
-                } else {
-                    session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+        maxSessions = poolSize;
+        this.sleepTime = sleepTime;
+
+        String maxSessionsPerConnectionConfig = poolKey.get(JMSEpr.MAX_SESSIONS_PER_CONNECTION);
+        if(maxSessionsPerConnectionConfig != null) {
+            try {
+                maxSessionsPerConnection = Integer.parseInt(maxSessionsPerConnectionConfig.trim());
+                if(maxSessionsPerConnection < 1) {
+                    throw new ConnectionException("Invalid '" + JMSEpr.MAX_SESSIONS_PER_CONNECTION + "' configuration value '" +  maxSessionsPerConnection + "'.  Must be greater than 0.");
                 }
-                return session ;
+            } catch(NumberFormatException e) {
+                throw new ConnectionException("Invalid '" + JMSEpr.MAX_SESSIONS_PER_CONNECTION + "' configuration value '" +  maxSessionsPerConnectionConfig.trim() + "'.  Must be a valid Integer.");
             }
-        }) ;
-        
-        //Create a new Session
-        ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
-        // For now we only support JTA transacted sessions
-        try
-        {
-            freeSessions.add(future.get());
+        } else {
+            maxSessionsPerConnection = maxSessions;
         }
-        catch (final InterruptedException ie) {} // ignore
-        catch (final ExecutionException ee)
-        {
-            final Throwable th = ee.getCause() ;
-            if (th instanceof JMSException)
-            {
-                throw (JMSException)th ;
-            }
-            if (th instanceof Error)
-            {
-                throw (Error)th ;
-            }
-            if (th instanceof RuntimeException)
-            {
-                throw (RuntimeException)th ;
-            }
-        }
-        logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
+
+        maxConnections = (maxSessions/maxSessionsPerConnection);
     }
 
+    protected int getMaxSessions() {
+        return maxSessions;
+    }
+
+    protected int getMaxSessionsPerConnection() {
+        return maxSessionsPerConnection;
+    }
+
+    protected int getMaxConnections() {
+        return maxConnections;
+    }
+
+    protected List<JmsSessionPool> getSessionPools() {
+        return sessionPools;
+    }
+
     /**
      *  This method can be called whenever a connection is needed from the pool.
-     *  
+     *
      * @return Connection to be used
      * @throws ConnectionException
      */
     public synchronized JmsSession getSession(final int acknowledgeMode) throws NamingException, JMSException, ConnectionException
     {
+        if (terminated)
+        {
+            throw new ConnectionException("Connection pool has been terminated") ;
+        }
+
         try
         {
             return internalGetSession(acknowledgeMode) ;
@@ -234,22 +202,22 @@
             throw jmse ;
         }
     }
-    
+
     private synchronized JmsSession internalGetSession(final int acknowledgeMode)
         throws NamingException, JMSException, ConnectionException
     {
-        try {
-            initConnection() ;
-        } catch (final NamingContextException nce) {
-            throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
+        if(sessionPools.isEmpty()) {
+            // Create the first pool entry...
+            addSessionPool();
         }
+
         final boolean transacted ;
         try {
             transacted = (isXAAware && TransactionStrategy.getTransactionStrategy(true).isActive()) ;
         } catch (final TransactionStrategyException tse) {
             throw new ConnectionException("Failed to determine current transaction context", tse) ;
         }
-        
+
         if (transacted)
         {
             final JmsXASession currentSession = getXASession() ;
@@ -258,44 +226,113 @@
                 return currentSession ;
             }
         }
+
         final int mode = (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode) ;
-        
-        final long end = System.currentTimeMillis() + (SLEEP_TIME * 1000) ;
+        final long end = System.currentTimeMillis() + (sleepTime * 1000) ;
         boolean emitExpiry = logger.isDebugEnabled() ;
-        for(;;) {
-            ArrayList<JmsSession> freeSessions = freeSessionsMap.get(mode );
-            ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(mode);
-            if (freeSessions.size() > 0)
+
+        while(true) {
+            // Cycle through all the existing session pools and try getting a
+            // free session.  Will JmsSessionPool.getSession will add a session
+            // to a pool that is not "full"...
+            for (JmsSessionPool sessionPool : sessionPools) {
+                JmsSession session = sessionPool.getSession(mode, transacted);
+
+                if(session != null) {
+                    return session;
+                }   
+            }
+
+            // OK... all the existing session pools are full and have no free sessions.  If we can add
+            // another session pool, add it and then start this loop again...
+            if(sessionPools.size() < maxConnections) {
+                addSessionPool();
+                continue;
+            }
+
+            // We've reached our max permitted number of connections and the sessions
+            // associated with these connections are all in use. Drop into the
+            // delay below and wait for a session to be freed...
+            if (emitExpiry)
             {
-                final JmsSession session = freeSessions.remove(freeSessions.size()-1);
-                inUseSessions.add(session);
-                return session ;
-            } else if (inUseSessions.size()<MAX_SESSIONS) {
-                addAnotherSession(poolKey, transacted, mode);
-                continue ;
-            } else {
-                if (emitExpiry)
+                logger.debug("The connection pool was exhausted, waiting for a session to be released.") ;
+                emitExpiry = false ;
+            }
+
+            // Wait and loop again...
+            final long now = System.currentTimeMillis() ;
+            final long delay = (end - now) ;
+            if (delay <= 0)
+            {
+                throw new ConnectionException("Could not obtain a JMS connection from the pool after "+ sleepTime +"s.");
+            }
+            else
+            {
+                try
                 {
-                    logger.debug("The connection pool was exhausted, waiting for a session to be released.") ;
-                    emitExpiry = false ;
+                    wait(delay) ;
                 }
-                final long now = System.currentTimeMillis() ;
-                final long delay = (end - now) ;
-                if (delay <= 0)
+                catch (final InterruptedException ie) {} // ignore
+            }
+        }
+    }
+
+    private JmsSessionPool addSessionPool() throws NamingException, JMSException, ConnectionException {
+        try {
+            JmsSessionPool sessionPool = new JmsSessionPool();
+
+            logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
+            Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
+            Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
+            try {
+                String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
+                Object factoryConnection=null;
+
+                try
                 {
-                    throw new ConnectionException("Could not obtain a JMS connection from the pool after "+SLEEP_TIME+"s.");
+                    factoryConnection = jndiContext.lookup(connectionFactoryString);
+                } catch (NamingException ne) {
+                    logger.info("Received NamingException, refreshing context.");
+                    jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
+                    factoryConnection = jndiContext.lookup(connectionFactoryString);
                 }
-                else
-                {
-                    try
+                final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
+                final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
+                boolean useJMSSecurity = (username != null && password != null);
+                logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
+                if (factoryConnection instanceof XAConnectionFactory) {
+                    final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
+                    sessionPool.jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
+                    isXAAware = true ;
+                    sessionPool.freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+                    sessionPool.inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+                } else if (factoryConnection instanceof ConnectionFactory) {
+                    final ConnectionFactory factory = (ConnectionFactory)factoryConnection ;
+                    sessionPool.jmsConnection = useJMSSecurity ? factory.createConnection(username,password): factory.createConnection();
+                }
+
+                sessionPool.jmsConnection.setExceptionListener(new ExceptionListener() {
+                    public void onException(JMSException arg0)
                     {
-                        wait(delay) ;
+                        // This will result in all connections (and their sessions) on this pool
+                        // being closed...
+                        cleanSessionPool() ;
                     }
-                    catch (final InterruptedException ie) {} // ignore
-                }
+                }) ;
+                sessionPool.jmsConnection.start();
+
+                // And add it to the pool...
+                sessionPools.add(sessionPool);
+
+                return sessionPool;
+            } finally {
+                NamingContextPool.releaseNamingContext(jndiContext) ;
             }
+        } catch (final NamingContextException nce) {
+            throw new ConnectionException("Unexpected exception accessing Naming Context", nce) ;
         }
     }
+
     /**
      *  This method can be called whenever a Session is needed from the pool.
      * @return
@@ -337,36 +374,11 @@
      */
     synchronized void handleCloseSession(final JmsSession session)
     {
-        if (session.isSuspect())
-        {
-            logger.debug("Session is suspect, dropping") ;
-            handleReleaseSession(session) ;
+        JmsSessionPool sessionPool = findOwnerPool(session);
+
+        if(sessionPool != null) {
+            sessionPool.handleCloseSession(session);
         }
-        else
-        {
-            if (session.getId() != id)
-            {
-                logger.debug("Session is from a previous incarnation, dropping") ;
-            }
-            else
-            {
-                final int mode ;
-                try {
-                    mode = session.getAcknowledgeMode() ;
-                } catch (final JMSException jmse) {
-                    logger.warn("JMSException while calling getAcknowledgeMode") ;
-                    logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
-                    return ;
-                }
-                
-                final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
-                if (sessions != null) {
-                    sessions.add(session) ;
-                }
-            }
-            session.releaseResources() ;
-            releaseInUseSession(session) ;
-        }
     }
     
     /**
@@ -390,19 +402,12 @@
      */
     private void releaseInUseSession(final JmsSession session)
     {
-        final int mode ;
-        try {
-            mode = session.getAcknowledgeMode() ;
-        } catch (final JMSException jmse) {
-            logger.warn("JMSException while calling getAcknowledgeMode") ;
-            logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
-            return ;
+        JmsSessionPool sessionPool = findOwnerPool(session);
+
+        if(sessionPool != null) {
+            sessionPool.releaseInUseSession(session);
         }
-        
-        final ArrayList<JmsSession> sessions = (inUseSessionsMap == null ? null : inUseSessionsMap.get(mode));
-        if (sessions != null) {
-            sessions.remove(session) ;
-        }
+
         notifyAll() ;
     }
     
@@ -435,34 +440,14 @@
      */
     private void cleanSessionPool()
     {
-        final Connection connection ;
-        synchronized(this)
-       {
-            if (terminated)
-            {
-                return ;
+        for(JmsSessionPool sessionPool : sessionPools) {
+            try {
+                sessionPool.cleanSessionPool();
+            } catch(Exception e) {
+                logger.error("Exception while cleaning JmsSessionPool.", e);
             }
-            id++ ;
-            for (List<JmsSession> list : freeSessionsMap.values())
-            {
-                list.clear() ;
-            }
-            for (List<JmsSession> list : inUseSessionsMap.values())
-            {
-                list.clear() ;
-            }
-            transactionsToSessions.clear() ;
-            sessionsToTransactions.clear() ;
-            
-            logger.debug("Cleared the session pool now closing the connection to the factory.");
-            connection = jmsConnection ;
-            jmsConnection = null ;
         }
-        if (connection!=null) {
-            try {
-                connection.close();
-            } catch (final Exception ex) {} // ignore
-        }
+        sessionPools.clear();
     }
     
     /**
@@ -471,19 +456,16 @@
      */
     public synchronized void removeSessionPool()
     {
-        freeSessionsMap = null ;
-        inUseSessionsMap = null ;
-        transactionsToSessions = null ;
-        sessionsToTransactions = null ;
-        
-        logger.debug("Emptied the session pool now closing the connection to the factory.");
-        if (jmsConnection!=null) {
+        for(JmsSessionPool sessionPool : sessionPools) {
             try {
-                jmsConnection.close();
-            } catch (final Exception ex) {} // ignore
-            jmsConnection=null;
-            terminated = true ;
+                sessionPool.removeSessionPool();
+            } catch(Exception e) {
+                logger.error("Exception while removing JmsSessionPool.", e);
+            }
         }
+
+        sessionPools.clear();
+        terminated = true ;
         JmsConnectionPoolContainer.removePool(poolKey);
     }
     /**
@@ -514,9 +496,13 @@
      * @return int	the number of in use sessions
      */
     public synchronized int getFreeSessionsInPool(final int acknowledgeMode) {
-        final ArrayList<JmsSession> freeSessionMap = (freeSessionsMap == null ? null : freeSessionsMap.get(acknowledgeMode)) ;
-        final int numFreeSessions = (freeSessionMap == null ? 0 : freeSessionMap.size()) ;
-        return numFreeSessions;
+        int count = 0;
+
+        for(JmsSessionPool sessionPool : sessionPools) {
+            count += sessionPool.getFreeSessionsInPool(acknowledgeMode);
+        }
+
+        return count;
     }
     
     /**
@@ -526,69 +512,13 @@
      * @return int	the number of in use sessions
      */
     public synchronized int getInUseSessionsInPool(final int acknowledgeMode) {
-        final ArrayList<JmsSession> inUseSessionMap = (inUseSessionsMap == null ? null : inUseSessionsMap.get(acknowledgeMode)) ;
-        final int numInUseSessions = (inUseSessionMap == null ? 0 : inUseSessionMap.size()) ;
-        return numInUseSessions;
-    }
-    
-    /**
-     * Initialise the connection.
-     * @throws ConnectionException If the pool has already been terminated.
-     * @throws NamingContextException for errors obtaining a naming context
-     * @throws NamingException for errors accessing a naming context
-     * @throws JMSException for errors creating the connection
-     */
-    private synchronized void initConnection()
-        throws ConnectionException, NamingContextException, NamingException, JMSException
-    {
-        if (terminated)
-        {
-            throw new ConnectionException("Connection pool has been terminated") ;
+        int count = 0;
+
+        for(JmsSessionPool sessionPool : sessionPools) {
+            count += sessionPool.getInUseSessionsInPool(acknowledgeMode);
         }
-        
-        if (jmsConnection==null) {
-            JmsConnectionPoolContainer.addToPool(poolKey, this);
-            logger.debug("Creating a JMS Connection for poolKey : " + poolKey);
-            Properties jndiEnvironment = JmsConnectionPoolContainer.getJndiEnvironment(poolKey);
-            Context jndiContext = NamingContextPool.getNamingContext(jndiEnvironment);
-            try {
-                String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
-                Object factoryConnection=null;
 
-                try
-                {
-                    factoryConnection = jndiContext.lookup(connectionFactoryString);
-                } catch (NamingException ne) {
-                    logger.info("Received NamingException, refreshing context.");
-                    jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
-                    factoryConnection = jndiContext.lookup(connectionFactoryString);
-                }
-                final String username = poolKey.get( JMSEpr.JMS_SECURITY_PRINCIPAL_TAG );
-                final String password = poolKey.get( JMSEpr.JMS_SECURITY_CREDENTIAL_TAG );
-                boolean useJMSSecurity = (username != null && password != null);
-                logger.debug( "JMS Security principal [" + username + "] using JMS Security : " + useJMSSecurity );
-                if (factoryConnection instanceof XAConnectionFactory) {
-                    final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
-                    jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
-                    isXAAware = true ;
-                    freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
-                    inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
-                } else if (factoryConnection instanceof ConnectionFactory) {
-                    final ConnectionFactory factory = (ConnectionFactory)factoryConnection ;
-                    jmsConnection = useJMSSecurity ? factory.createConnection(username,password): factory.createConnection();
-                }
-                
-                jmsConnection.setExceptionListener(new ExceptionListener() {
-                    public void onException(JMSException arg0)
-                    {
-                        cleanSessionPool() ;
-                    }
-                }) ;
-                jmsConnection.start();
-            } finally {
-                NamingContextPool.releaseNamingContext(jndiContext) ;
-            }
-        }
+        return count;
     }
     
     /**
@@ -615,7 +545,15 @@
         throws ConnectionException
     {
         final Object tx = getTransaction() ;
-        return transactionsToSessions.get(tx) ;
+
+        for(JmsSessionPool sessionPool : sessionPools) {
+            JmsXASession session = sessionPool.transactionsToSessions.get(tx);
+            if(session != null) {
+                return session;
+            }
+        }
+
+        return null;
     }
     
     /**
@@ -626,25 +564,36 @@
     synchronized void associateTransaction(final JmsXASession session)
         throws ConnectionException
     {
-        final Object tx = getTransaction() ;
-        if (tx == null)
-        {
-            throw new ConnectionException("No active transaction") ;
+        JmsSessionPool sessionPool = findOwnerPool(session);
+
+        if(sessionPool != null) {
+            sessionPool.associateTransaction(session);
         }
-        transactionsToSessions.put(tx, session) ;
-        sessionsToTransactions.put(session, tx) ;
     }
-    
+
     /**
      * Disassociate the JMS XA Session from a transaction.
      * @param session The XA session.
      */
     synchronized void disassociateTransaction(final JmsXASession session)
     {
-        final Object tx = sessionsToTransactions.remove(session) ;
-        transactionsToSessions.remove(tx) ;
+        JmsSessionPool sessionPool = findOwnerPool(session);
+
+        if(sessionPool != null) {
+            sessionPool.disassociateTransaction(session);
+        }
     }
-    
+
+    synchronized JmsSessionPool findOwnerPool(final JmsSession session) {
+        for(JmsSessionPool sessionPool : sessionPools) {
+            if(sessionPool.isOwnerPool(session)) {
+                return sessionPool;
+            }
+        }
+
+        return null;
+    }
+
     static
     {
     	PropertyManager prop = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE);
@@ -676,9 +625,322 @@
     		}
     	}
     }
-    
+
+    class JmsSessionPool {
+
+        /** Reference to a Queue or Topic Connection, we only need one per pool */
+        protected Connection jmsConnection ;
+
+        /** Number of free sessions in the pool that can be given out. Indexed by session key */
+        private Map<Integer,ArrayList<JmsSession>> freeSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
+
+        /** Number of session that are currently in use. Indexed by session key mode */
+        private Map<Integer,ArrayList<JmsSession>> inUseSessionsMap = new HashMap<Integer,ArrayList<JmsSession>>();
+
+        /**
+         * Mapping from transactions to sessions.
+         */
+        private Map<Object, JmsXASession> transactionsToSessions = new HashMap<Object, JmsXASession>() ;
+
+        /**
+         * Mapping from sessions to transactions.
+         */
+        private Map<JmsXASession, Object> sessionsToTransactions = new HashMap<JmsXASession, Object>() ;
+
+        private JmsSessionPool() {
+            freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+            freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+            freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+
+            inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+            inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+            inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+        }
+
+        private boolean isOwnerPool(JmsSession session) {
+            if (isInList(session, freeSessionsMap.values())) {
+                return true;
+            } else if (isInList(session, inUseSessionsMap.values())) {
+                return true;
+            }
+
+            return false;
+        }
+
+        private boolean isInList(JmsSession session, Collection<ArrayList<JmsSession>> sessionLists) {
+            for(ArrayList<JmsSession> sessionList : sessionLists) {
+                if(sessionList.contains(session)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public synchronized void removeSessionPool() {
+            freeSessionsMap = null ;
+            inUseSessionsMap = null ;
+            transactionsToSessions = null ;
+            sessionsToTransactions = null ;
+
+            logger.debug("Emptied the session pool now closing the connection to the factory.");
+            if (jmsConnection!=null) {
+                try {
+                    jmsConnection.close();
+                } catch (final Exception ex) {} // ignore
+                jmsConnection=null;
+            }
+        }
+
+        public synchronized JmsSession getSession(int acknowledgeMode, boolean transacted) throws JMSException {
+
+            ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode );
+            ArrayList<JmsSession> inUseSessions = inUseSessionsMap.get(acknowledgeMode);
+
+            if (freeSessions.size() > 0)
+            {
+                final JmsSession session = freeSessions.remove(freeSessions.size()-1);
+                inUseSessions.add(session);
+                return session ;
+            } else if (getSessionsInPool() < maxSessionsPerConnection) {
+                final JmsSession session = addAnotherSession(poolKey, transacted, acknowledgeMode);
+                if(session != null) {
+                    inUseSessions.add(session);
+                    return session ;
+                }
+            }
+
+            return null;
+        }
+
+
+        /**
+         * This is where we create the sessions.
+         *
+         * @param poolKey
+         * @param transacted
+         * @throws JMSException
+         */
+        private  synchronized JmsSession addAnotherSession(Map<String, String> poolKey, final boolean transacted, final int acknowledgeMode)
+            throws JMSException
+        {
+            // Sessions need to be created in this way because of an issue with JBM.
+            // See https://jira.jboss.org/jira/browse/JBESB-1799
+            final Future<JmsSession> future = COMPLETION_SERVICE.submit(new Callable<JmsSession>() {
+                public JmsSession call()
+                    throws JMSException
+                {
+                    final JmsSession session ;
+
+                    if (transacted) {
+                        session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
+                    } else {
+                        session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
+                    }
+
+                    return session ;
+                }
+            }) ;
+
+            //Create a new Session
+            ArrayList<JmsSession> freeSessions = freeSessionsMap.get(acknowledgeMode);
+            // For now we only support JTA transacted sessions
+            try
+            {
+                JmsSession session = future.get();
+                logger.debug("Number of Sessions in the pool with acknowledgeMode: " + acknowledgeMode + " is now " + getSessionsInPool(acknowledgeMode));
+                return session;
+            }
+            catch (final InterruptedException ie) {} // ignore
+            catch (final ExecutionException ee)
+            {
+                final Throwable th = ee.getCause() ;
+                if (th instanceof JMSException)
+                {
+                    throw (JMSException)th ;
+                }
+                if (th instanceof Error)
+                {
+                    throw (Error)th ;
+                }
+                if (th instanceof RuntimeException)
+                {
+                    throw (RuntimeException)th ;
+                }
+            }
+
+            return null;
+        }
+
+        public void cleanSessionPool() {
+            final Connection connection ;
+            synchronized(this)
+           {
+                if (terminated)
+                {
+                    return ;
+                }
+                id++ ;
+                for (List<JmsSession> list : freeSessionsMap.values())
+                {
+                    list.clear() ;
+                }
+                for (List<JmsSession> list : inUseSessionsMap.values())
+                {
+                    list.clear() ;
+                }
+                transactionsToSessions.clear() ;
+                sessionsToTransactions.clear() ;
+
+                logger.debug("Cleared the session pool now closing the connection to the factory.");
+                connection = jmsConnection ;
+                jmsConnection = null ;
+            }
+            if (connection!=null) {
+                try {
+                    connection.close();
+                } catch (final Exception ex) {} // ignore
+            }
+        }
+
+        /**
+         * Returns the total number of sessions in the pool.
+         * @return The total number of sessions in the pool.
+         */
+        private synchronized int getSessionsInPool() {
+            int total = 0;
+
+            total += getSessionsInMap(freeSessionsMap);
+            total += getSessionsInMap(inUseSessionsMap);
+
+            return total;
+        }
+
+        private synchronized int getSessionsInMap(Map<Integer,ArrayList<JmsSession>> sessionsMap) {
+            Collection<ArrayList<JmsSession>> sessionLists = sessionsMap.values();
+            int total = 0;
+
+            for(ArrayList<JmsSession> sessionList : sessionLists) {
+                total += sessionList.size();
+            }
+
+            return total;
+        }
+
+        /**
+         * Returns the total nr of sessions for the specifed acknowledge mode
+         *
+         * @param acknowledgeMode the acknowledge mode of sessions
+         * @return
+         */
+        public synchronized int getSessionsInPool(final int acknowledgeMode) {
+            return getFreeSessionsInPool(acknowledgeMode) + getInUseSessionsInPool(acknowledgeMode) ;
+        }
+
+        /**
+         * Get the number of free sessions created with the specified acknowledge mode
+         * @param acknowledgeMode the acknowledge mode of sessions
+         * @return int	the number of in use sessions
+         */
+        public synchronized int getFreeSessionsInPool(final int acknowledgeMode) {
+            final ArrayList<JmsSession> freeSessionMap = (freeSessionsMap == null ? null : freeSessionsMap.get(acknowledgeMode)) ;
+            final int numFreeSessions = (freeSessionMap == null ? 0 : freeSessionMap.size()) ;
+            return numFreeSessions;
+        }
+
+        /**
+         * Get the number of sessions that are in use and that were
+         * created with the specified acknowledge mode
+         * @param acknowledgeMode the acknowledge mode of sessions
+         * @return int	the number of in use sessions
+         */
+        public synchronized int getInUseSessionsInPool(final int acknowledgeMode) {
+            final ArrayList<JmsSession> inUseSessionMap = (inUseSessionsMap == null ? null : inUseSessionsMap.get(acknowledgeMode)) ;
+            final int numInUseSessions = (inUseSessionMap == null ? 0 : inUseSessionMap.size()) ;
+            return numInUseSessions;
+        }
+
+        /**
+         * Associate the JMS XA Session with the current transaction.
+         * @param session The XA session.
+         * @throws ConnectionException if there is no transaction active.
+         */
+        synchronized void associateTransaction(final JmsXASession session)
+            throws ConnectionException
+        {
+            final Object tx = getTransaction() ;
+            if (tx == null)
+            {
+                throw new ConnectionException("No active transaction") ;
+            }
+            transactionsToSessions.put(tx, session) ;
+            sessionsToTransactions.put(session, tx) ;
+        }
+
+        /**
+         * Disassociate the JMS XA Session from a transaction.
+         * @param session The XA session.
+         */
+        synchronized void disassociateTransaction(final JmsXASession session)
+        {
+            final Object tx = sessionsToTransactions.remove(session) ;
+            transactionsToSessions.remove(tx) ;
+        }
+
+        public void releaseInUseSession(JmsSession session) {
+            final int mode ;
+            try {
+                mode = session.getAcknowledgeMode() ;
+            } catch (final JMSException jmse) {
+                logger.warn("JMSException while calling getAcknowledgeMode") ;
+                logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+                return ;
+            }
+
+            final ArrayList<JmsSession> sessions = (inUseSessionsMap == null ? null : inUseSessionsMap.get(mode));
+            if (sessions != null) {
+                sessions.remove(session) ;
+            }
+        }
+
+        public void handleCloseSession(JmsSession session) {
+            if (session.isSuspect())
+            {
+                logger.debug("Session is suspect, dropping") ;
+                handleReleaseSession(session) ;
+            }
+            else
+            {
+                if (session.getId() != id)
+                {
+                    logger.debug("Session is from a previous incarnation, dropping") ;
+                }
+                else
+                {
+                    final int mode ;
+                    try {
+                        mode = session.getAcknowledgeMode() ;
+                    } catch (final JMSException jmse) {
+                        logger.warn("JMSException while calling getAcknowledgeMode") ;
+                        logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+                        return ;
+                    }
+
+                    final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+                    if (sessions != null) {
+                        sessions.add(session) ;
+                    }
+                }
+                session.releaseResources() ;
+                releaseInUseSession(session) ;
+            }
+        }
+    }
+
     /**
      * Thread factory returning daemon threads.
+     * <p/>
+     * Required as part of the fix for https://jira.jboss.org/jira/browse/JBESB-1799
+     *
      * @author kevin
      */
     private static final class DaemonThreadFactory implements ThreadFactory
@@ -687,7 +949,7 @@
          * The default executor factory.
          */
         private final ThreadFactory defaultFactory = Executors.defaultThreadFactory() ;
-        
+
         /**
          * Return a new daemon thread.
          * @param runnable The runnable associated with the thread.

Modified: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java	2009-05-15 15:16:23 UTC (rev 26564)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/JMSEpr.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -75,9 +75,11 @@
 	public static final String DESTINATION_NAME_TAG = "destination-name";
 
 	public static final String CONNECTION_FACTORY_TAG = "connection-factory";
-	
-	public static final String JNDI_PKG_PREFIX_TAG = "jndi-pkg-prefix";
 
+    public static final String MAX_SESSIONS_PER_CONNECTION = "max-sessions-per-connection";
+
+    public static final String JNDI_PKG_PREFIX_TAG = "jndi-pkg-prefix";
+
 	public static final String JNDI_URL_TAG = "jndi-URL";
 
 	public static final String JNDI_CONTEXT_FACTORY_TAG = "jndi-context-factory";

Copied: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java (from rev 26274, labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JBossActivationMapper.java)
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java	                        (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,104 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.soa.esb.listeners.jca;
+
+import java.util.Map;
+
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.jboss.soa.esb.ConfigurationException;
+
+
+/**
+ * Activation mapper implementation for IBM MQ (aka WMQ).
+ *
+ * @author <a href="kevin.conner at jboss.com">Kevin Conner</a>
+ */
+public class WMQActivationMapper implements ActivationMapper
+{
+    /**
+     * Initialise the destination name in the activation configuration.
+     * @param activationConfig The current activation configuration.
+     * @param name The destination name.
+     * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+     */
+    public void setDestination(final Map<String, String> activationConfig, final String name)
+        throws ConfigurationException
+    {
+        activationConfig.put("destination", name) ;
+    }
+
+    /**
+     * Initialise the destination name in the activation configuration.
+     * @param activationConfig The current activation configuration.
+     * @param providerAdapterJNDI The provider adapter JNDI value or null is not specified.
+     * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+     */
+    public void setProviderAdapterJNDI(final Map<String, String> activationConfig, final String providerAdapterJNDI)
+        throws ConfigurationException
+    {
+        if (providerAdapterJNDI != null)
+        {
+            activationConfig.put("providerAdapterJNDI", providerAdapterJNDI) ;
+        }
+    }
+
+    /**
+     * Initialise the destination name in the activation configuration.
+     * @param activationConfig The current activation configuration.
+     * @param queue True if specifying a JMS Queue, false is specifying a JMS Topic.
+     * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+     */
+    public void setDestinationType(final Map<String, String> activationConfig, final boolean queue)
+        throws ConfigurationException
+    {
+        activationConfig.put("destinationType", queue ? Queue.class.getName() :Topic.class.getName()) ;
+    }
+
+    /**
+     * Initialise the destination name in the activation configuration.
+     * @param activationConfig The current activation configuration.
+     * @param messageSelector The message selector or null if not specified.
+     * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+     */
+    public void setMessageSelector(final Map<String, String> activationConfig, final String messageSelector)
+        throws ConfigurationException
+    {
+        if (messageSelector != null)
+        {
+            activationConfig.put("messageSelector", messageSelector) ;
+        }
+    }
+
+    /**
+     * Initialise the destination name in the activation configuration.
+     * @param activationConfig The current activation configuration.
+     * @param maxThreads The maximum thread value or null if not specified.
+     * @throws org.jboss.soa.esb.ConfigurationException For invalid configuration.
+     */
+    public void setMaxThreads(final Map<String, String> activationConfig, final Integer maxThreads)
+        throws ConfigurationException
+    {
+        // Not applicable to WMQ!!
+    }
+}
\ No newline at end of file


Property changes on: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/src/org/jboss/soa/esb/listeners/jca/WMQActivationMapper.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java	                        (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import junit.framework.TestCase;
+
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.internal.soa.esb.rosetta.pooling.jms.MockJMSConnectionFactory;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MaxSessionsPerConnectionUnitTest extends TestCase {
+
+    private Map<String, String> jndiEnv = new HashMap<String, String>();
+
+    public MaxSessionsPerConnectionUnitTest() {
+        jndiEnv.put(Context.INITIAL_CONTEXT_FACTORY, MockInitialContextFactory.class.getName());
+        jndiEnv.put(JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+        jndiEnv.put(JMSEpr.MAX_SESSIONS_PER_CONNECTION, "2");
+    }
+
+    public void test() throws NamingException, JMSException, ConnectionException {
+        MockJndiContextHandler.objects.put("ConnectionFactory", new MockJMSConnectionFactory(2));
+        JmsConnectionPool connPool = new JmsConnectionPool(jndiEnv);
+        List<JmsConnectionPool.JmsSessionPool> sessPools = connPool.getSessionPools();
+
+        assertEquals(20, connPool.getMaxSessions());
+        assertEquals(2, connPool.getMaxSessionsPerConnection());
+        assertEquals(10, connPool.getMaxConnections());
+        assertEquals(0, sessPools.size());
+
+        try {
+            JmsSession session1 = connPool.getSession();
+
+            // Just get 1 session.  Make sure it's returned to the
+            // pool on closing...
+            assertEquals(1, sessPools.size());
+            assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(1, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            connPool.handleCloseSession(session1);
+            assertEquals(1, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(0, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+
+            JmsSession session2 = connPool.getSession();
+
+            // Now try another session.  Should be same session instance as last time.
+            // Make sure it's returned to the pool on closing...
+            assertTrue(session1 == session2);
+            assertEquals(1, sessPools.size());
+            assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(1, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            connPool.handleCloseSession(session2);
+            assertEquals(1, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(0, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+
+            // Now try 3 sessions.  Should cause another pool to be created.
+            // Make sure it's returned to the pool on closing...
+            session1 = connPool.getSession();
+            session2 = connPool.getSession();
+            JmsSession session3 = connPool.getSession();
+
+            assertEquals(2, sessPools.size());
+            assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(2, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(0, sessPools.get(1).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(1, sessPools.get(1).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            connPool.handleCloseSession(session1);
+            connPool.handleCloseSession(session2);
+            connPool.handleCloseSession(session3);
+            assertEquals(2, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(0, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(1, sessPools.get(1).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(0, sessPools.get(1).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+
+            List<JmsSession> sessions = getSessions(connPool, 20);
+            assertEquals(10, sessPools.size());
+            assertAllInUse(sessPools);
+            closeAll(sessions, connPool);
+            assertAllFree(sessPools);
+        } finally {
+            connPool.removeSessionPool();
+        }
+    }
+
+    private void closeAll(List<JmsSession> sessions, JmsConnectionPool connPool) {
+        for(JmsSession session : sessions) {
+            connPool.handleCloseSession(session);
+        }
+    }
+
+    private void assertAllInUse(List<JmsConnectionPool.JmsSessionPool> sessPools) {
+        for(JmsConnectionPool.JmsSessionPool sessPool : sessPools) {
+            assertEquals(0, sessPool.getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(2, sessPool.getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+        }
+    }
+
+    private void assertAllFree(List<JmsConnectionPool.JmsSessionPool> sessPools) {
+        for(JmsConnectionPool.JmsSessionPool sessPool : sessPools) {
+            assertEquals(2, sessPool.getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+            assertEquals(0, sessPool.getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));
+        }
+    }
+
+    private List<JmsSession> getSessions(JmsConnectionPool connPool, int numSessions) throws NamingException, JMSException, ConnectionException {
+        List<JmsSession> sessions = new ArrayList<JmsSession>();
+        for(int i = 0; i < numSessions; i++) {
+            sessions.add(connPool.getSession());
+        }
+        return sessions;
+    }
+}

Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java	                        (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockInitialContextFactory.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.naming.spi.InitialContextFactory;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import java.util.Hashtable;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockInitialContextFactory implements InitialContextFactory {
+    
+    public Context getInitialContext(Hashtable<?, ?> environment) throws NamingException {
+        return MockJndiContextHandler.getInstance();
+    }
+}

Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java	                        (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MockJndiContextHandler.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import javax.naming.Context;
+import javax.naming.NameNotFoundException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockJndiContextHandler implements InvocationHandler {
+
+    public static Map<String, Object> objects = new HashMap<String, Object>();
+
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+        if(method.getName().equals("lookup")) {
+            String name = (String) args[0];
+            Object object = objects.get(name);
+
+            if(object == null) {
+                throw new NameNotFoundException("Obect '" + name + "' not found.");
+            }
+
+            return object;            
+        }
+
+        return null;
+    }
+
+    public static Context getInstance() {
+        return (Context) Proxy.newProxyInstance(Context.class.getClassLoader(),
+                new Class[]{Context.class},
+                new MockJndiContextHandler());
+    }
+}

Added: labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java
===================================================================
--- labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java	                        (rev 0)
+++ labs/jbossesb/workspace/TF_JmsConnectionPool/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/jms/MockJMSConnectionFactory.java	2009-05-15 15:21:08 UTC (rev 26565)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.rosetta.pooling.jms;
+
+import org.jboss.internal.soa.esb.rosetta.pooling.MockJndiContextHandler;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockJMSConnectionFactory implements ConnectionFactory {
+
+    private int maxSessionsPerConnection;
+
+    public MockJMSConnectionFactory(int maxSessionsPerConnection) {
+        this.maxSessionsPerConnection = maxSessionsPerConnection;
+    }
+
+    public Connection createConnection() throws JMSException {
+        return createConnectionHandler();
+    }
+
+    public Connection createConnection(String s, String s1) throws JMSException {
+        return createConnectionHandler();
+    }
+
+    private Connection createConnectionHandler() {
+        return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
+                new Class[]{Connection.class},
+                new MockJMSConnectionHandler(maxSessionsPerConnection));
+    }
+
+    private class MockJMSConnectionHandler implements InvocationHandler {
+
+        private int maxSessionsPerConnection;
+        private List<Session> sessions = new ArrayList<Session>();
+
+        public MockJMSConnectionHandler(int maxSessionsPerConnection) {
+            this.maxSessionsPerConnection = maxSessionsPerConnection;
+        }
+
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+            if(method.getName().equals("createSession")) {
+                System.out.println("Creating JMS Session");
+
+                if(maxSessionsPerConnection > 0 && sessions.size() == maxSessionsPerConnection) {
+                    throw new JMSException("Unable to create JMS Session on Connection.  Maximum of " + maxSessionsPerConnection + " Sessions/Connection.");
+                }
+
+                Session session = (Session) Proxy.newProxyInstance(Session.class.getClassLoader(),
+                        new Class[]{Session.class},
+                        new MockJMSSessionHandler());
+
+                sessions.add(session);
+
+                return session;
+            }
+
+            System.out.println("MockJMSConnectionHandler: Call to " + method.getName());
+
+            return null;
+        }
+    }
+
+    private class MockJMSSessionHandler implements InvocationHandler {
+
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+            if(method.getName().equals("getAcknowledgeMode")) {
+                return Session.AUTO_ACKNOWLEDGE;
+            }
+
+            System.out.println("MockJMSSessionHandler: Call to " + method.getName());
+
+            return null;
+        }
+    }
+}




More information about the jboss-svn-commits mailing list