[jboss-svn-commits] JBL Code SVN: r10187 - in labs/jbossesb/trunk: product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Mar 14 15:50:44 EDT 2007


Author: kurt.stam at jboss.com
Date: 2007-03-14 15:50:44 -0400 (Wed, 14 Mar 2007)
New Revision: 10187

Added:
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
   labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/
   labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java
Modified:
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
Log:
Adding JMS Pooling. The JMSNotifiers are hooked up. The JMSCouriers still need to be worked on.

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,50 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+/**
+ * This exception is thrown when Connection Pool fails to initialize
+ * @author kstam
+ * Date: March 10, 2007
+ */
+public class ConnectionException extends Exception{
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public ConnectionException() {
+        super();
+    }
+
+    public ConnectionException(String message) {
+        super(message);
+    }
+
+    public ConnectionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ConnectionException(Throwable cause) {
+        super(cause);
+    }
+}


Property changes on: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,222 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.helpers.NamingContext;
+
+/**
+ * Interface that needs to be implemented to provide pool of connections.
+ * @see DefaultConnectionPoolImpl
+ * Default implementation of Connection Pool
+ * @author kstam
+ * Date: March 10, 2007
+ */
+public class JmsConnectionPool
+{
+    /** Maximum number of Sessions that will be created in this pool */
+    private int MAX_SESSIONS=10;    //TODO Make this manageable
+    /** Number of free sessions in the pool that can be given out */
+    private ArrayList<Session> freeSessions = new ArrayList<Session>();
+    /** Number of session that are currently in use */
+    private ArrayList<Session> inUseSessions = new ArrayList<Session>();
+    /** Reference to a Queue or Topic Connection, we only need one per pool */
+    private Connection jmsConnection = null;
+    /** The Indentifier of the pool */
+    private Map<String, String> poolKey;
+    /** Logger */
+    private Logger logger = Logger.getLogger(this.getClass());
+    /**
+     * Contructor of the pool.
+     * 
+     */
+    public JmsConnectionPool(Map<String, String> poolKey) 
+    {
+        this.poolKey = poolKey;
+    }
+   
+    /**
+     * This is where we create the sessions. 
+     * 
+     * @param poolKey
+     * @throws NamingException
+     * @throws JMSException
+     * @throws ConnectionException
+     */
+    private  synchronized void addAnotherSession(Map<String, String> poolKey)
+    throws NamingException, JMSException, ConnectionException
+    {
+        String destinationType = poolKey.get(JMSEpr.DESTINATION_TYPE_TAG);
+        
+        //Setup a connection if we don't have one
+        if (jmsConnection==null) {
+            JmsConnectionPoolContainer.addToPool(poolKey, this);
+            logger.debug("Creating a JMS Connection for ");
+            Context jndiContext = NamingContext.getServerContext(poolKey.get(JMSEpr.JNDI_URL_TAG)
+                    , poolKey.get(JMSEpr.JNDI_CONTEXT_FACTORY_TAG), poolKey.get(JMSEpr.JNDI_PKG_PREFIX_TAG));
+            String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
+            Object factoryConnection = jndiContext.lookup(connectionFactoryString);
+            if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
+                QueueConnectionFactory factory = (QueueConnectionFactory) factoryConnection;
+                jmsConnection = factory.createQueueConnection();
+            } else {
+                TopicConnectionFactory factory = (TopicConnectionFactory) factoryConnection;
+                jmsConnection = factory.createTopicConnection();
+            }
+            jndiContext.close(); //TODO Make sure this is ok for MQSeries.
+        }
+        //Create a new Session
+        if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
+            logger.debug("Creating a new Queue session.");
+            QueueSession session = ((QueueConnection)jmsConnection).createQueueSession(false,
+                    QueueSession.AUTO_ACKNOWLEDGE);
+            freeSessions.add(session);
+        } else if (JMSEpr.TOPIC_TYPE.equals(destinationType)) {
+            logger.debug("Creating a new Topic session.");
+            TopicSession session = ((TopicConnection) jmsConnection).createTopicSession(false,
+                        TopicSession.AUTO_ACKNOWLEDGE);
+            freeSessions.add(session);
+        } else {
+            throw new ConnectionException("Unknown destination type");
+        }
+        logger.debug("Number of Sessions in the pool now is " + getSessionsInPool());
+    }
+
+    /**
+     *  This method can be called whenever a connection is needed from the pool.
+     *  
+     * @return Connection to be used
+     * @throws ConnectionException
+     */
+    private synchronized Session getSession() throws NamingException, JMSException, ConnectionException
+    {
+        Session session = null;
+        int waitInSeconds = 0;
+        while (session == null) {
+            if (freeSessions.size() > 0)
+            {
+                session = freeSessions.get(freeSessions.size()-1);
+                freeSessions.remove(session);
+                inUseSessions.add(session);
+            } else {
+                if (waitInSeconds++ > 30) { 
+//                  We'll give up after not be able to get a session for 30 seconds.
+                    throw new ConnectionException("Could not obtain a JMS connection from the pool after 30s.");
+                }
+                //Add a connection if we can
+                if (inUseSessions.size()<MAX_SESSIONS) {
+                    addAnotherSession(poolKey);
+                } else {
+                    try {
+                        //wait one second and try again.
+                        logger.info("The connection pool was exhausted. Waiting 1 second before trying again..");
+                        Thread.sleep(1000);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+        return session;
+    }
+    /**
+     *  This method can be called whenever a Queue Session is needed from the pool.
+     * @return
+     * @throws NamingException
+     * @throws JMSException
+     * @throws ConnectionException
+     */
+    public QueueSession getQueueSession() throws NamingException, JMSException, ConnectionException
+    {
+        return (QueueSession) getSession();
+    }
+    /**
+     * This method can be called whenever a Topic Session is needed from the pool.
+     * @return
+     * @throws NamingException
+     * @throws JMSException
+     * @throws ConnectionException
+     */
+    public TopicSession getTopicSession() throws NamingException, JMSException, ConnectionException
+    {
+        return (TopicSession) getSession();
+    }
+
+    /**
+     * This method closes an open connection and returns the connection to the pool.
+     * @param connectionToClose The connection to be returned to the pool.
+     * @throws SQLException
+     */
+    public  synchronized void closeSession(Session connectionToClose){
+        freeSessions.add(connectionToClose);
+        inUseSessions.remove(connectionToClose);
+    }
+
+    /**
+     * This method is called when the pool needs to destroyed. It closes all open sessions
+     * and the connection and removes it from the container's poolMap.
+     * 
+     * @throws ConnectionException
+     */
+    public synchronized void removeSessionPool() throws JMSException
+    {
+        for (Iterator<Session> i=freeSessions.iterator();i.hasNext();) {
+            i.next().close();
+            i.remove();
+        }
+        for (Iterator<Session> i=inUseSessions.iterator();i.hasNext();) {
+            i.next().close();
+            i.remove();
+        }
+        logger.debug("Emptied the session pool now closing the connection to the factory.");
+        if (jmsConnection!=null) {
+            jmsConnection.close();
+            jmsConnection=null;
+        }
+        JmsConnectionPoolContainer.removePool(poolKey);
+    }
+    /**
+     * Gets the total number of sessions in the pool.
+     * @return the session pool size
+     */
+    public int getSessionsInPool() {
+        return freeSessions.size() + inUseSessions.size();
+    }
+}


Property changes on: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,138 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.JMSException;
+
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+
+/**
+ * This is a temporary pooling class until we start using JCA. It is designed to build
+ * a JMS session pool for each ConnectionFactory.
+ * 
+ * @author kstam
+ * Date: March 13, 2007
+ */
+public class JmsConnectionPoolContainer{
+    /**
+     * Container of all the pools in the system.
+     */
+    private static ConcurrentHashMap<Map<String, String>, JmsConnectionPool> poolMap 
+        = new ConcurrentHashMap<Map<String, String>, JmsConnectionPool>();
+   
+    /**
+     * Returns the pool given a JMSEpr.
+     * 
+     * @param jmsEpr
+     * @return JmsSessionPool
+     * @throws ConnectionException
+     * @throws URISyntaxException
+     */
+    public static JmsConnectionPool getPool(JMSEpr jmsEpr) throws ConnectionException, URISyntaxException
+    {
+        return getPool(jmsEpr.getJndiURL(), jmsEpr.getJndiContextFactory(), jmsEpr.getJndiPkgPrefix()
+                , jmsEpr.getConnectionFactory(), jmsEpr.getDestinationType());
+    }
+    /**
+     * Returns the pool given the identifiers for the JMS provider.
+     * 
+     * @param poolKey
+     * @return
+     * @throws ConnectionException
+     */
+    public static JmsConnectionPool getPool(String jndiURL, String jndiContextFactory,
+            String jndiPkgPrefix, String connectionFactory, String destinationType)
+    {
+        Map<String,String> poolKey = createPoolKey(jndiURL, jndiContextFactory,
+                jndiPkgPrefix, connectionFactory, destinationType);
+        if (poolMap.containsKey(poolKey)) {
+            return poolMap.get(poolKey);
+        } else {
+            JmsConnectionPool pool = new JmsConnectionPool(poolKey);
+            poolMap.put(poolKey, pool);
+            return pool;
+        }
+    }
+    /**
+     * Creates a poolKey using the identifying parameters
+     * 
+     * @param jndiURL
+     * @param jndiContextFactory
+     * @param jndiPkgPrefix
+     * @param connectionFactory
+     * @param destinationType
+     * @param destinationName
+     * @return
+     */
+    public static Map<String, String> createPoolKey(String jndiURL, String jndiContextFactory,
+            String jndiPkgPrefix, String connectionFactory, String destinationType) 
+    {
+        Map<String,String> poolKey = new HashMap<String,String>();
+        if (jndiURL!=null)            poolKey.put(JMSEpr.JNDI_URL_TAG, jndiURL);
+        if (jndiContextFactory!=null) poolKey.put(JMSEpr.JNDI_CONTEXT_FACTORY_TAG, jndiContextFactory);
+        if (jndiPkgPrefix!=null)      poolKey.put(JMSEpr.JNDI_PKG_PREFIX_TAG, jndiPkgPrefix);
+        if (connectionFactory!=null)  poolKey.put(JMSEpr.CONNECTION_FACTORY_TAG, connectionFactory);
+        if (destinationType!=null)    poolKey.put(JMSEpr.DESTINATION_TYPE_TAG, destinationType);
+        return poolKey;
+    }
+    /**
+     * Gets the number of pools in the constainer.
+     * 
+     */
+    public static int getNumberOfPools() {
+        return poolMap.size();
+    }
+    /**
+     * Removes the poolKey from the poolMap.
+     * 
+     * @param poolKey
+     */
+    protected static void removePool(Map<String, String> poolKey) {
+        if (poolMap.containsKey(poolKey)) {
+            poolMap.remove(poolKey);
+        }
+    }
+    /**
+     * Adds this poolKey and pool to the the poolMap.
+     * 
+     * @param poolKey
+     */
+    protected static void addToPool(Map<String, String> poolKey, JmsConnectionPool pool) {
+        if (!poolMap.containsKey(poolKey)) {
+            poolMap.put(poolKey, pool);
+        }
+    }
+    /**
+     * Removes all pools from the container.
+     * 
+     */
+    public static void removeAllPools() throws JMSException{
+        for (Map<String,String> poolKey : poolMap.keySet()) {
+            poolMap.get(poolKey).removeSessionPool();
+        }
+    }
+}


Property changes on: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java	2007-03-14 18:49:42 UTC (rev 10186)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -26,7 +26,6 @@
 import java.util.Iterator;
 import java.util.Properties;
 
-import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -36,6 +35,7 @@
 import javax.naming.NamingException;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.helpers.ConfigTree;
 
@@ -54,7 +54,7 @@
  */
 public abstract class NotifyJMS extends NotificationTarget
 {
-    private Logger log = Logger.getLogger(this.getClass());
+    protected Logger log = Logger.getLogger(this.getClass());
 	/**
 	 * Abstract method - All classes that extend NotifyJMS must implement it
 	 * according to their own javax.jms.Destination needs (Queue/Topic)
@@ -101,7 +101,7 @@
 	/**
 	 * The javax.jms.Connection instance used to talk to JMS
 	 */
-	protected Connection m_oConn;
+	protected JmsConnectionPool mPool;
 
 	/**
 	 * The javax.jms.Session instance used to talk to JMS
@@ -159,31 +159,8 @@
 	 */
 	public void release ()
 	{
-		if (null != m_oSess) try
-		{
-			m_oSess.close();
-		}
-		catch (Exception e1)
-		{
-            log.error(e1.getMessage(), e1);
-		}
-		if (null != m_oConn) try
-		{
-			m_oConn.close();
-		}
-		catch (Exception e2)
-		{
-            log.error(e2.getMessage(), e2);
-		}
-        if (null != m_oCtx) try
-        {
-            m_oCtx.close();
-        }
-        catch (NamingException ne) 
-        {
-            log.error(ne.getMessage(), ne);
-        }
-	} // __________________________________
+    	mPool.closeSession(m_oSess);
+	}
 
 	/**
 	 * Send a JMS message using p_o to fill in the message content and the list

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java	2007-03-14 18:49:42 UTC (rev 10186)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -26,14 +26,14 @@
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
 import javax.naming.NamingException;
 
-import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 
 ;
@@ -52,7 +52,6 @@
  */
 public class NotifyQueues extends NotifyJMS
 {
-	private Logger log = Logger.getLogger(this.getClass());
 	/**
 	 * Connection Factory JNDI name.
 	 */
@@ -67,21 +66,17 @@
 	 */
 	public static final String CHILD_QUEUE = "queue";
 
-	public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException
+	public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
 	{
 		super(p_oP);
 		setQueues(p_oP.getChildren(CHILD_QUEUE));
 	} // __________________________________
 
-	protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException
+	protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
 	{
 		try
 		{
-			QueueConnectionFactory qcf = lookupQueueConnectionFactory();
-			QueueConnection oQconn = qcf.createQueueConnection();
-			QueueSession oQsess = oQconn.createQueueSession(false,
-					QueueSession.AUTO_ACKNOWLEDGE);
-	
+            mPool = JmsConnectionPoolContainer.getPool(null, null, null, CONNECTION_FACTORY, JMSEpr.QUEUE_TYPE);
 			m_oaMssProd = new MessageProducer[p_oaP.length];
 			for (int i1 = 0; i1 < p_oaP.length; i1++)
 			{
@@ -89,36 +84,17 @@
 				if (null == sAtt) 
 					throw new ConfigurationException("Missing queue jndiName");
 				Queue oQ = (Queue) m_oCtx.lookup(sAtt);
-				m_oaMssProd[i1] = oQsess.createSender(oQ);
+                QueueSession queueSession = mPool.getQueueSession();
+				m_oaMssProd[i1] = queueSession.createSender(oQ);
+                m_oSess = queueSession;
 			}
-	
-			m_oConn = oQconn;
-			m_oSess = oQsess;
-			// m_oConn.start(); Only for incoming messages ???
 		}
 		catch (NamingException ex)
 		{
 			throw new ConfigurationException(ex);
 		}
 	}
-
 	/**
-	 * Get the {@link QueueConnectionFactory} to be used for this instance. <p/>
-	 * Can be overridden for testing.
-	 * 
-	 * @return The QueueConnectionFactory fro the JNDI context.
-	 * @throws NamingException
-	 */
-	protected QueueConnectionFactory lookupQueueConnectionFactory ()
-			throws NamingException
-	{
-		// REVIEW: The connection factory name is hardcoded and is the same as
-		// that of the topic connection factory.
-		// KS: Yes this should come from the configuration, the Queue maybe located on a different server.
-		return (QueueConnectionFactory) m_oCtx.lookup(CONNECTION_FACTORY);
-	}
-
-	/**
 	 * Send a message to all the configured queues.
 	 */
 	protected void sendToAll (Message p_oMsg)

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java	2007-03-14 18:49:42 UTC (rev 10186)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -26,13 +26,14 @@
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.naming.NamingException;
 
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 /**
  * 
@@ -45,32 +46,18 @@
 
 	public static final String CHILD_TOPIC = "topic";
 
-	public NotifyTopics (ConfigTree p_oP) throws ConfigurationException, JMSException
+	public NotifyTopics (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
 	{
 		super(p_oP);
 
 		setTopics(p_oP.getChildren(CHILD_TOPIC));
 	} // __________________________________
 
-	protected void setTopics (ConfigTree[] p_oaP) throws ConfigurationException, JMSException
+	protected void setTopics (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
 	{
 		// REVIEW: The connection factory name is hardcoded and is the same as
 		// that of the queue connection factory.
-		TopicConnectionFactory qcf = null;
-		
-		try
-		{
-			qcf = (TopicConnectionFactory) m_oCtx.lookup(CONNECTION_FACTORY);
-		}
-		catch (NamingException ex)
-		{
-			throw new ConfigurationException(ex);
-		}
-		
-		TopicConnection oTconn = qcf.createTopicConnection();
-		TopicSession oTsess = oTconn.createTopicSession(false,
-				TopicSession.AUTO_ACKNOWLEDGE);
-
+        mPool = JmsConnectionPoolContainer.getPool(null, null, null, CONNECTION_FACTORY, JMSEpr.TOPIC_TYPE);
 		m_oaMssProd = new MessageProducer[p_oaP.length];
 		
 		try
@@ -80,20 +67,17 @@
 				String sAtt = p_oaP[i1].getAttribute(ATT_DEST_NAME);
 				if (null == sAtt) throw new ConfigurationException("Missing topic jndiName");
 				Topic oT = (Topic) m_oCtx.lookup(sAtt);
-				m_oaMssProd[i1] = oTsess.createPublisher(oT);
+                TopicSession topicSession = mPool.getTopicSession();
+				m_oaMssProd[i1] = topicSession.createPublisher(oT);
+                m_oSess = topicSession;
 			}
 		}
 		catch (NamingException ex)
 		{
 			throw new ConfigurationException(ex);
 		}
+	}
 
-		m_oConn = oTconn;
-		m_oSess = oTsess;
-		// m_oConn.start(); Only for incoming messages ???
-
-	} // __________________________________
-
 	protected void sendToAll (Message p_oMsg)
 	{
 		for (int i1 = 0; i1 < m_oaMssProd.length; i1++)
@@ -106,6 +90,7 @@
 			}
 			catch (Exception e)
 			{
+                log.error(e.getMessage(), e);
 			}
 		}
 		release();

Added: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java	2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,107 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.soa.esb.rosetta.pooling;
+
+import javax.jms.Session;
+
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.helpers.NamingContext;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author kstam
+ *
+ */
+public class JmsConnectionPoolingTest {
+    
+    @Test
+    public void testPoolAndConnectionCreation() 
+    {
+        JmsConnectionPool jmsConnectionPool = null;
+        try {
+            jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+            assertEquals(0, jmsConnectionPool.getSessionsInPool());
+            //Open 3 concurrent sessions
+            Session session1 = jmsConnectionPool.getQueueSession();
+            assertEquals(1, jmsConnectionPool.getSessionsInPool());
+            Session session2 = jmsConnectionPool.getQueueSession();
+            assertEquals(2, jmsConnectionPool.getSessionsInPool());
+            Session session3 = jmsConnectionPool.getQueueSession();
+            assertEquals(3, jmsConnectionPool.getSessionsInPool());
+            //Close them
+            jmsConnectionPool.closeSession(session1);
+            jmsConnectionPool.closeSession(session2);
+            jmsConnectionPool.closeSession(session3);
+            assertEquals(3, jmsConnectionPool.getSessionsInPool());
+            //destroy this pool
+            jmsConnectionPool.removeSessionPool();
+            assertEquals(0, jmsConnectionPool.getSessionsInPool());
+            assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
+            
+            //Use it again and add one session
+            jmsConnectionPool.getQueueSession();
+            assertEquals(1, jmsConnectionPool.getSessionsInPool());
+            assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
+            //I should be able to remove the entire pool and have it do closing
+            //of the session.
+            jmsConnectionPool.removeSessionPool();
+        } catch (Exception e) {
+            assertTrue(false);
+            e.printStackTrace();
+        }
+    }
+    
+    @Test
+    public void testCreateSecondPool()
+    {
+        
+        try {
+            JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+            jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+            //This should be the same pool
+            assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
+        
+            jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
+            //This should be a different pool, so now we should have 2.
+            assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
+            
+            jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, null, null, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
+            //This should be a different pool, so now we should have 3.
+            assertEquals(3, JmsConnectionPoolContainer.getNumberOfPools());
+            
+            //Now lets cleanup after ourselves
+            jmsConnectionPool.removeSessionPool();
+            assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
+            
+            //Let's clean the rest up with a removeAll.
+            JmsConnectionPoolContainer.removeAllPools();
+            assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools()); 
+        } catch (Exception e) {
+            assertTrue(false);
+            e.printStackTrace();
+        }
+    }
+}


Property changes on: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java
___________________________________________________________________
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list