[jboss-svn-commits] JBL Code SVN: r10187 - in labs/jbossesb/trunk: product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Mar 14 15:50:44 EDT 2007
Author: kurt.stam at jboss.com
Date: 2007-03-14 15:50:44 -0400 (Wed, 14 Mar 2007)
New Revision: 10187
Added:
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/
labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java
Modified:
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
Log:
Adding JMS Pooling. The JMSNotifiers are hooked up. The JMSCouriers still need to be worked on.
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,50 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+/**
+ * This exception is thrown when Connection Pool fails to initialize
+ * @author kstam
+ * Date: March 10, 2007
+ */
+public class ConnectionException extends Exception{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public ConnectionException() {
+ super();
+ }
+
+ public ConnectionException(String message) {
+ super(message);
+ }
+
+ public ConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ConnectionException(Throwable cause) {
+ super(cause);
+ }
+}
Property changes on: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/ConnectionException.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,222 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.helpers.NamingContext;
+
+/**
+ * Interface that needs to be implemented to provide pool of connections.
+ * @see DefaultConnectionPoolImpl
+ * Default implementation of Connection Pool
+ * @author kstam
+ * Date: March 10, 2007
+ */
+public class JmsConnectionPool
+{
+ /** Maximum number of Sessions that will be created in this pool */
+ private int MAX_SESSIONS=10; //TODO Make this manageable
+ /** Number of free sessions in the pool that can be given out */
+ private ArrayList<Session> freeSessions = new ArrayList<Session>();
+ /** Number of session that are currently in use */
+ private ArrayList<Session> inUseSessions = new ArrayList<Session>();
+ /** Reference to a Queue or Topic Connection, we only need one per pool */
+ private Connection jmsConnection = null;
+ /** The Indentifier of the pool */
+ private Map<String, String> poolKey;
+ /** Logger */
+ private Logger logger = Logger.getLogger(this.getClass());
+ /**
+ * Contructor of the pool.
+ *
+ */
+ public JmsConnectionPool(Map<String, String> poolKey)
+ {
+ this.poolKey = poolKey;
+ }
+
+ /**
+ * This is where we create the sessions.
+ *
+ * @param poolKey
+ * @throws NamingException
+ * @throws JMSException
+ * @throws ConnectionException
+ */
+ private synchronized void addAnotherSession(Map<String, String> poolKey)
+ throws NamingException, JMSException, ConnectionException
+ {
+ String destinationType = poolKey.get(JMSEpr.DESTINATION_TYPE_TAG);
+
+ //Setup a connection if we don't have one
+ if (jmsConnection==null) {
+ JmsConnectionPoolContainer.addToPool(poolKey, this);
+ logger.debug("Creating a JMS Connection for ");
+ Context jndiContext = NamingContext.getServerContext(poolKey.get(JMSEpr.JNDI_URL_TAG)
+ , poolKey.get(JMSEpr.JNDI_CONTEXT_FACTORY_TAG), poolKey.get(JMSEpr.JNDI_PKG_PREFIX_TAG));
+ String connectionFactoryString = poolKey.get(JMSEpr.CONNECTION_FACTORY_TAG);
+ Object factoryConnection = jndiContext.lookup(connectionFactoryString);
+ if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
+ QueueConnectionFactory factory = (QueueConnectionFactory) factoryConnection;
+ jmsConnection = factory.createQueueConnection();
+ } else {
+ TopicConnectionFactory factory = (TopicConnectionFactory) factoryConnection;
+ jmsConnection = factory.createTopicConnection();
+ }
+ jndiContext.close(); //TODO Make sure this is ok for MQSeries.
+ }
+ //Create a new Session
+ if (JMSEpr.QUEUE_TYPE.equals(destinationType)) {
+ logger.debug("Creating a new Queue session.");
+ QueueSession session = ((QueueConnection)jmsConnection).createQueueSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ freeSessions.add(session);
+ } else if (JMSEpr.TOPIC_TYPE.equals(destinationType)) {
+ logger.debug("Creating a new Topic session.");
+ TopicSession session = ((TopicConnection) jmsConnection).createTopicSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ freeSessions.add(session);
+ } else {
+ throw new ConnectionException("Unknown destination type");
+ }
+ logger.debug("Number of Sessions in the pool now is " + getSessionsInPool());
+ }
+
+ /**
+ * This method can be called whenever a connection is needed from the pool.
+ *
+ * @return Connection to be used
+ * @throws ConnectionException
+ */
+ private synchronized Session getSession() throws NamingException, JMSException, ConnectionException
+ {
+ Session session = null;
+ int waitInSeconds = 0;
+ while (session == null) {
+ if (freeSessions.size() > 0)
+ {
+ session = freeSessions.get(freeSessions.size()-1);
+ freeSessions.remove(session);
+ inUseSessions.add(session);
+ } else {
+ if (waitInSeconds++ > 30) {
+// We'll give up after not be able to get a session for 30 seconds.
+ throw new ConnectionException("Could not obtain a JMS connection from the pool after 30s.");
+ }
+ //Add a connection if we can
+ if (inUseSessions.size()<MAX_SESSIONS) {
+ addAnotherSession(poolKey);
+ } else {
+ try {
+ //wait one second and try again.
+ logger.info("The connection pool was exhausted. Waiting 1 second before trying again..");
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ return session;
+ }
+ /**
+ * This method can be called whenever a Queue Session is needed from the pool.
+ * @return
+ * @throws NamingException
+ * @throws JMSException
+ * @throws ConnectionException
+ */
+ public QueueSession getQueueSession() throws NamingException, JMSException, ConnectionException
+ {
+ return (QueueSession) getSession();
+ }
+ /**
+ * This method can be called whenever a Topic Session is needed from the pool.
+ * @return
+ * @throws NamingException
+ * @throws JMSException
+ * @throws ConnectionException
+ */
+ public TopicSession getTopicSession() throws NamingException, JMSException, ConnectionException
+ {
+ return (TopicSession) getSession();
+ }
+
+ /**
+ * This method closes an open connection and returns the connection to the pool.
+ * @param connectionToClose The connection to be returned to the pool.
+ * @throws SQLException
+ */
+ public synchronized void closeSession(Session connectionToClose){
+ freeSessions.add(connectionToClose);
+ inUseSessions.remove(connectionToClose);
+ }
+
+ /**
+ * This method is called when the pool needs to destroyed. It closes all open sessions
+ * and the connection and removes it from the container's poolMap.
+ *
+ * @throws ConnectionException
+ */
+ public synchronized void removeSessionPool() throws JMSException
+ {
+ for (Iterator<Session> i=freeSessions.iterator();i.hasNext();) {
+ i.next().close();
+ i.remove();
+ }
+ for (Iterator<Session> i=inUseSessions.iterator();i.hasNext();) {
+ i.next().close();
+ i.remove();
+ }
+ logger.debug("Emptied the session pool now closing the connection to the factory.");
+ if (jmsConnection!=null) {
+ jmsConnection.close();
+ jmsConnection=null;
+ }
+ JmsConnectionPoolContainer.removePool(poolKey);
+ }
+ /**
+ * Gets the total number of sessions in the pool.
+ * @return the session pool size
+ */
+ public int getSessionsInPool() {
+ return freeSessions.size() + inUseSessions.size();
+ }
+}
Property changes on: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,138 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.internal.soa.esb.rosetta.pooling;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.JMSException;
+
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+
+/**
+ * This is a temporary pooling class until we start using JCA. It is designed to build
+ * a JMS session pool for each ConnectionFactory.
+ *
+ * @author kstam
+ * Date: March 13, 2007
+ */
+public class JmsConnectionPoolContainer{
+ /**
+ * Container of all the pools in the system.
+ */
+ private static ConcurrentHashMap<Map<String, String>, JmsConnectionPool> poolMap
+ = new ConcurrentHashMap<Map<String, String>, JmsConnectionPool>();
+
+ /**
+ * Returns the pool given a JMSEpr.
+ *
+ * @param jmsEpr
+ * @return JmsSessionPool
+ * @throws ConnectionException
+ * @throws URISyntaxException
+ */
+ public static JmsConnectionPool getPool(JMSEpr jmsEpr) throws ConnectionException, URISyntaxException
+ {
+ return getPool(jmsEpr.getJndiURL(), jmsEpr.getJndiContextFactory(), jmsEpr.getJndiPkgPrefix()
+ , jmsEpr.getConnectionFactory(), jmsEpr.getDestinationType());
+ }
+ /**
+ * Returns the pool given the identifiers for the JMS provider.
+ *
+ * @param poolKey
+ * @return
+ * @throws ConnectionException
+ */
+ public static JmsConnectionPool getPool(String jndiURL, String jndiContextFactory,
+ String jndiPkgPrefix, String connectionFactory, String destinationType)
+ {
+ Map<String,String> poolKey = createPoolKey(jndiURL, jndiContextFactory,
+ jndiPkgPrefix, connectionFactory, destinationType);
+ if (poolMap.containsKey(poolKey)) {
+ return poolMap.get(poolKey);
+ } else {
+ JmsConnectionPool pool = new JmsConnectionPool(poolKey);
+ poolMap.put(poolKey, pool);
+ return pool;
+ }
+ }
+ /**
+ * Creates a poolKey using the identifying parameters
+ *
+ * @param jndiURL
+ * @param jndiContextFactory
+ * @param jndiPkgPrefix
+ * @param connectionFactory
+ * @param destinationType
+ * @param destinationName
+ * @return
+ */
+ public static Map<String, String> createPoolKey(String jndiURL, String jndiContextFactory,
+ String jndiPkgPrefix, String connectionFactory, String destinationType)
+ {
+ Map<String,String> poolKey = new HashMap<String,String>();
+ if (jndiURL!=null) poolKey.put(JMSEpr.JNDI_URL_TAG, jndiURL);
+ if (jndiContextFactory!=null) poolKey.put(JMSEpr.JNDI_CONTEXT_FACTORY_TAG, jndiContextFactory);
+ if (jndiPkgPrefix!=null) poolKey.put(JMSEpr.JNDI_PKG_PREFIX_TAG, jndiPkgPrefix);
+ if (connectionFactory!=null) poolKey.put(JMSEpr.CONNECTION_FACTORY_TAG, connectionFactory);
+ if (destinationType!=null) poolKey.put(JMSEpr.DESTINATION_TYPE_TAG, destinationType);
+ return poolKey;
+ }
+ /**
+ * Gets the number of pools in the constainer.
+ *
+ */
+ public static int getNumberOfPools() {
+ return poolMap.size();
+ }
+ /**
+ * Removes the poolKey from the poolMap.
+ *
+ * @param poolKey
+ */
+ protected static void removePool(Map<String, String> poolKey) {
+ if (poolMap.containsKey(poolKey)) {
+ poolMap.remove(poolKey);
+ }
+ }
+ /**
+ * Adds this poolKey and pool to the the poolMap.
+ *
+ * @param poolKey
+ */
+ protected static void addToPool(Map<String, String> poolKey, JmsConnectionPool pool) {
+ if (!poolMap.containsKey(poolKey)) {
+ poolMap.put(poolKey, pool);
+ }
+ }
+ /**
+ * Removes all pools from the container.
+ *
+ */
+ public static void removeAllPools() throws JMSException{
+ for (Map<String,String> poolKey : poolMap.keySet()) {
+ poolMap.get(poolKey).removeSessionPool();
+ }
+ }
+}
Property changes on: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java 2007-03-14 18:49:42 UTC (rev 10186)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -26,7 +26,6 @@
import java.util.Iterator;
import java.util.Properties;
-import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -36,6 +35,7 @@
import javax.naming.NamingException;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -54,7 +54,7 @@
*/
public abstract class NotifyJMS extends NotificationTarget
{
- private Logger log = Logger.getLogger(this.getClass());
+ protected Logger log = Logger.getLogger(this.getClass());
/**
* Abstract method - All classes that extend NotifyJMS must implement it
* according to their own javax.jms.Destination needs (Queue/Topic)
@@ -101,7 +101,7 @@
/**
* The javax.jms.Connection instance used to talk to JMS
*/
- protected Connection m_oConn;
+ protected JmsConnectionPool mPool;
/**
* The javax.jms.Session instance used to talk to JMS
@@ -159,31 +159,8 @@
*/
public void release ()
{
- if (null != m_oSess) try
- {
- m_oSess.close();
- }
- catch (Exception e1)
- {
- log.error(e1.getMessage(), e1);
- }
- if (null != m_oConn) try
- {
- m_oConn.close();
- }
- catch (Exception e2)
- {
- log.error(e2.getMessage(), e2);
- }
- if (null != m_oCtx) try
- {
- m_oCtx.close();
- }
- catch (NamingException ne)
- {
- log.error(ne.getMessage(), ne);
- }
- } // __________________________________
+ mPool.closeSession(m_oSess);
+ }
/**
* Send a JMS message using p_o to fill in the message content and the list
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java 2007-03-14 18:49:42 UTC (rev 10186)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -26,14 +26,14 @@
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.NamingException;
-import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
;
@@ -52,7 +52,6 @@
*/
public class NotifyQueues extends NotifyJMS
{
- private Logger log = Logger.getLogger(this.getClass());
/**
* Connection Factory JNDI name.
*/
@@ -67,21 +66,17 @@
*/
public static final String CHILD_QUEUE = "queue";
- public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException
+ public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
{
super(p_oP);
setQueues(p_oP.getChildren(CHILD_QUEUE));
} // __________________________________
- protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException
+ protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
{
try
{
- QueueConnectionFactory qcf = lookupQueueConnectionFactory();
- QueueConnection oQconn = qcf.createQueueConnection();
- QueueSession oQsess = oQconn.createQueueSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
-
+ mPool = JmsConnectionPoolContainer.getPool(null, null, null, CONNECTION_FACTORY, JMSEpr.QUEUE_TYPE);
m_oaMssProd = new MessageProducer[p_oaP.length];
for (int i1 = 0; i1 < p_oaP.length; i1++)
{
@@ -89,36 +84,17 @@
if (null == sAtt)
throw new ConfigurationException("Missing queue jndiName");
Queue oQ = (Queue) m_oCtx.lookup(sAtt);
- m_oaMssProd[i1] = oQsess.createSender(oQ);
+ QueueSession queueSession = mPool.getQueueSession();
+ m_oaMssProd[i1] = queueSession.createSender(oQ);
+ m_oSess = queueSession;
}
-
- m_oConn = oQconn;
- m_oSess = oQsess;
- // m_oConn.start(); Only for incoming messages ???
}
catch (NamingException ex)
{
throw new ConfigurationException(ex);
}
}
-
/**
- * Get the {@link QueueConnectionFactory} to be used for this instance. <p/>
- * Can be overridden for testing.
- *
- * @return The QueueConnectionFactory fro the JNDI context.
- * @throws NamingException
- */
- protected QueueConnectionFactory lookupQueueConnectionFactory ()
- throws NamingException
- {
- // REVIEW: The connection factory name is hardcoded and is the same as
- // that of the topic connection factory.
- // KS: Yes this should come from the configuration, the Queue maybe located on a different server.
- return (QueueConnectionFactory) m_oCtx.lookup(CONNECTION_FACTORY);
- }
-
- /**
* Send a message to all the configured queues.
*/
protected void sendToAll (Message p_oMsg)
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java 2007-03-14 18:49:42 UTC (rev 10186)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -26,13 +26,14 @@
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.NamingException;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
/**
*
@@ -45,32 +46,18 @@
public static final String CHILD_TOPIC = "topic";
- public NotifyTopics (ConfigTree p_oP) throws ConfigurationException, JMSException
+ public NotifyTopics (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
{
super(p_oP);
setTopics(p_oP.getChildren(CHILD_TOPIC));
} // __________________________________
- protected void setTopics (ConfigTree[] p_oaP) throws ConfigurationException, JMSException
+ protected void setTopics (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
{
// REVIEW: The connection factory name is hardcoded and is the same as
// that of the queue connection factory.
- TopicConnectionFactory qcf = null;
-
- try
- {
- qcf = (TopicConnectionFactory) m_oCtx.lookup(CONNECTION_FACTORY);
- }
- catch (NamingException ex)
- {
- throw new ConfigurationException(ex);
- }
-
- TopicConnection oTconn = qcf.createTopicConnection();
- TopicSession oTsess = oTconn.createTopicSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
-
+ mPool = JmsConnectionPoolContainer.getPool(null, null, null, CONNECTION_FACTORY, JMSEpr.TOPIC_TYPE);
m_oaMssProd = new MessageProducer[p_oaP.length];
try
@@ -80,20 +67,17 @@
String sAtt = p_oaP[i1].getAttribute(ATT_DEST_NAME);
if (null == sAtt) throw new ConfigurationException("Missing topic jndiName");
Topic oT = (Topic) m_oCtx.lookup(sAtt);
- m_oaMssProd[i1] = oTsess.createPublisher(oT);
+ TopicSession topicSession = mPool.getTopicSession();
+ m_oaMssProd[i1] = topicSession.createPublisher(oT);
+ m_oSess = topicSession;
}
}
catch (NamingException ex)
{
throw new ConfigurationException(ex);
}
+ }
- m_oConn = oTconn;
- m_oSess = oTsess;
- // m_oConn.start(); Only for incoming messages ???
-
- } // __________________________________
-
protected void sendToAll (Message p_oMsg)
{
for (int i1 = 0; i1 < m_oaMssProd.length; i1++)
@@ -106,6 +90,7 @@
}
catch (Exception e)
{
+ log.error(e.getMessage(), e);
}
}
release();
Added: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java (rev 0)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java 2007-03-14 19:50:44 UTC (rev 10187)
@@ -0,0 +1,107 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.soa.esb.rosetta.pooling;
+
+import javax.jms.Session;
+
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.helpers.NamingContext;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author kstam
+ *
+ */
+public class JmsConnectionPoolingTest {
+
+ @Test
+ public void testPoolAndConnectionCreation()
+ {
+ JmsConnectionPool jmsConnectionPool = null;
+ try {
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+ assertEquals(0, jmsConnectionPool.getSessionsInPool());
+ //Open 3 concurrent sessions
+ Session session1 = jmsConnectionPool.getQueueSession();
+ assertEquals(1, jmsConnectionPool.getSessionsInPool());
+ Session session2 = jmsConnectionPool.getQueueSession();
+ assertEquals(2, jmsConnectionPool.getSessionsInPool());
+ Session session3 = jmsConnectionPool.getQueueSession();
+ assertEquals(3, jmsConnectionPool.getSessionsInPool());
+ //Close them
+ jmsConnectionPool.closeSession(session1);
+ jmsConnectionPool.closeSession(session2);
+ jmsConnectionPool.closeSession(session3);
+ assertEquals(3, jmsConnectionPool.getSessionsInPool());
+ //destroy this pool
+ jmsConnectionPool.removeSessionPool();
+ assertEquals(0, jmsConnectionPool.getSessionsInPool());
+ assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
+
+ //Use it again and add one session
+ jmsConnectionPool.getQueueSession();
+ assertEquals(1, jmsConnectionPool.getSessionsInPool());
+ assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
+ //I should be able to remove the entire pool and have it do closing
+ //of the session.
+ jmsConnectionPool.removeSessionPool();
+ } catch (Exception e) {
+ assertTrue(false);
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateSecondPool()
+ {
+
+ try {
+ JmsConnectionPool jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.QUEUE_TYPE);
+ //This should be the same pool
+ assertEquals(1, JmsConnectionPoolContainer.getNumberOfPools());
+
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(NamingContext.JBOSS_PROVIDER_URL, NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY, null, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
+ //This should be a different pool, so now we should have 2.
+ assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
+
+ jmsConnectionPool = JmsConnectionPoolContainer.getPool(null, null, null, "ConnectionFactory", JMSEpr.TOPIC_TYPE);
+ //This should be a different pool, so now we should have 3.
+ assertEquals(3, JmsConnectionPoolContainer.getNumberOfPools());
+
+ //Now lets cleanup after ourselves
+ jmsConnectionPool.removeSessionPool();
+ assertEquals(2, JmsConnectionPoolContainer.getNumberOfPools());
+
+ //Let's clean the rest up with a removeAll.
+ JmsConnectionPoolContainer.removeAllPools();
+ assertEquals(0, JmsConnectionPoolContainer.getNumberOfPools());
+ } catch (Exception e) {
+ assertTrue(false);
+ e.printStackTrace();
+ }
+ }
+}
Property changes on: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/rosetta/pooling/JmsConnectionPoolingTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list