[jboss-svn-commits] JBL Code SVN: r10291 - in labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb: rosetta/pooling and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Mar 16 18:37:12 EDT 2007


Author: kurt.stam at jboss.com
Date: 2007-03-16 18:37:12 -0400 (Fri, 16 Mar 2007)
New Revision: 10291

Modified:
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
Log:
Hooking up the JMS Connection Pool in the JMSCourier. 

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2007-03-16 22:27:37 UTC (rev 10290)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2007-03-16 22:37:12 UTC (rev 10291)
@@ -27,19 +27,13 @@
 import java.net.URISyntaxException;
 import java.util.List;
 
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
 import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.naming.Context;
@@ -47,6 +41,9 @@
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -115,28 +112,14 @@
 			_logger.error(e.getMessage(), e);
 		}
 
-		if (null != _jmsSession) try
+		if (null != _jmsSession)
 		{
-			_jmsSession.close();
+			_pool.closeSession(_jmsSession);
 		}
-		catch (JMSException e)
-		{
-			_logger.error(e.getMessage(), e);
-		}
 
-		if (null != _jmsConnection) try
-		{
-			_jmsConnection.close();
-		}
-		catch (JMSException e)
-		{
-			_logger.error(e.getMessage(), e);
-		}
-
 		_messageProducer = null;
 		_messageConsumer = null;
 		_jmsSession = null;
-		_jmsConnection = null;
 	} // ________________________________
 
 	/**
@@ -150,42 +133,46 @@
 	 */
 	public boolean deliver (Message message) throws CourierException
 	{
-		if (_isReceiver)
-			throw new CourierException("This is a read-only Courier");
-
-		if (null == message) return false;
-		if (null == _messageProducer) try
-		{
-			createMessageProducer();
-		}
-		catch (Exception e)
-		{
-			throw new CourierException(e);
-		}
-
-		while (null != _messageProducer)
-		{
-			try
-			{
-				// obtain Serializable version of arg0 and package it in a jms
-				// ObjectMessage
-				ObjectMessage msg = _jmsSession.createObjectMessage(Util
-						.serialize(message));
-				for (KeyValuePair kvp : _messageProperties)
-					msg.setStringProperty(kvp.getKey(), kvp.getValue());
-				sendMessage(msg);
-				return true;
-			}
-			catch (JMSException e)
-			{
-				jmsConnectRetry(e);
-			}
-			catch (Exception e)
-			{
-				throw new CourierException(e);
-			}
-		}
-		return false;
+        try {
+    		if (_isReceiver)
+    			throw new CourierException("This is a read-only Courier");
+    
+    		if (null == message) return false;
+    		if (null == _messageProducer) try
+    		{
+    			createMessageProducer();
+    		}
+    		catch (Exception e)
+    		{
+    			throw new CourierException(e);
+    		}
+    
+    		while (null != _messageProducer)
+    		{
+    			try
+    			{
+    				// obtain Serializable version of arg0 and package it in a jms
+    				// ObjectMessage
+    				ObjectMessage msg = _jmsSession.createObjectMessage(Util
+    						.serialize(message));
+    				for (KeyValuePair kvp : _messageProperties)
+    					msg.setStringProperty(kvp.getKey(), kvp.getValue());
+    				sendMessage(msg);
+    				return true;
+    			}
+    			catch (JMSException e)
+    			{
+    				jmsConnectRetry(e);
+    			}
+    			catch (Exception e)
+    			{
+    				throw new CourierException(e);
+    			}
+    		}
+    		return false;
+        } finally {
+            cleanup();
+        }
 	} // ________________________________
 
 	/**
@@ -205,7 +192,6 @@
 	private void jmsConnectRetry (Exception exc)
 	{
 		_logger.error("JMS error.  Attempting JMS reconnect.", exc);
-		_jmsConnection = null;
 		_jmsSession = null;
 		_messageProducer = null;
 		_messageConsumer = null;
@@ -251,54 +237,42 @@
 			if (Util.isNullString(sFactoryClass))
 				sFactoryClass = "ConnectionFactory";
 
-			Object tmp = oJndiCtx.lookup(sFactoryClass);
-
 			String sType = _epr.getDestinationType();
-			if (JMSEpr.QUEUE_TYPE.equals(sType))
-			{
-				QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
-				QueueConnection qConn = qcf.createQueueConnection();
-				QueueSession qSess = qConn.createQueueSession(false,
-						QueueSession.AUTO_ACKNOWLEDGE);
+             _pool = JmsConnectionPoolContainer.getPool(sJndiURL, sJndiConnectionFactory, sJndiPkgPrefix, sFactoryClass, sType);
+                
+			if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+				QueueSession qSess = _pool.getQueueSession();
+                _jmsSession = qSess;
 				javax.jms.Queue queue = null;
-				try
-				{
+				try {
 					queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
 							.getDestinationName());
-				}
-				catch (NamingException ne)
-				{
+				} catch (NamingException ne) {
 					queue = qSess.createQueue(_epr.getDestinationName());
 				}
-				_jmsConnection = qConn;
-				_jmsSession = qSess;
 				_messageProducer = qSess.createSender(queue);
-			}
-			else
-				if (JMSEpr.TOPIC_TYPE.equals(sType))
+			} else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+				TopicSession tSess = _pool.getTopicSession();
+                _jmsSession = tSess;
+				Topic topic = null;
+				try
 				{
-					TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
-					TopicConnection tConn = tcf.createTopicConnection();
-					TopicSession tSess = tConn.createTopicSession(false,
-							TopicSession.AUTO_ACKNOWLEDGE);
-					Topic topic = null;
-					try
-					{
-						topic = (Topic) oJndiCtx.lookup(_epr
-								.getDestinationName());
-					}
-					catch (NamingException ne)
-					{
-						topic = tSess.createTopic(_epr.getDestinationName());
-					}
-					_jmsConnection = tConn;
-					_jmsSession = tSess;
-					_messageProducer = tSess.createPublisher(topic);
+					topic = (Topic) oJndiCtx.lookup(_epr
+							.getDestinationName());
 				}
-				else
-					throw new CourierException("Unknown destination type");
-			setConnExceptionListener();
+				catch (NamingException ne)
+				{
+					topic = tSess.createTopic(_epr.getDestinationName());
+				}
+				_messageProducer = tSess.createPublisher(topic);
+			} else {
+				throw new CourierException("Unknown destination type");
+            }
 		}
+        catch (ConnectionException vex)
+        {
+            throw new CourierException(vex);
+        }
 		catch (JMSException ex)
 		{
 			_logger.error("Error from JMS system.", ex);
@@ -330,83 +304,87 @@
 
 	public Message pickup (long millis) throws CourierException
 	{
-		if (!_isReceiver)
-			throw new CourierException("This is an outgoing-only Courier");
-		if (millis < 1)
-			throw new IllegalArgumentException("Timeout millis must be > 0");
-		if (null == _messageConsumer) try
-		{
-			createMessageConsumer();
-		}
-		catch (Exception e)
-		{
-			try
-			{
-				Thread.sleep(1000); // TODO magic number
-			}
-			catch (InterruptedException eI)
-			{/* OK do nothing */
-			}
-			throw new CourierException("Unable to create Message Consumer", e);
-		}
-
-		javax.jms.Message jmsMessage = null;
-		while (null != _messageConsumer)
-		{
-			try
-			{
-				jmsMessage = _messageConsumer.receive(millis);
-				break;
-			}
-			catch (JMSException e)
-			{
-				jmsConnectRetry(e);
-			}
-			catch (Exception e)
-			{
-				throw new CourierException(e);
-			}
-		}
-		if (null == jmsMessage) return null;
-
-		if (!(jmsMessage instanceof ObjectMessage))
-		{
-			_logger.error("Unsupported JMS message type: " + jmsMessage
-					.getClass().getName());
-			return null;
-		}
-		try
-		{
-			Serializable obj = (Serializable) ((ObjectMessage) jmsMessage)
-					.getObject();
-			return Util.deserialize(obj);
-		}
-		catch (JMSException e1)
-		{
-			_logger.error("Failed to read Serialized Object from JMS message.",
-					e1);
-			return null;
-		}
-		catch (ClassCastException e2)
-		{
-			_logger
-					.error(
-							"Object in JMS message is not a org.jboss.soa.esb.message.Message",
-							e2);
-		}
-		catch (IOException e3)
-		{
-			_logger.error("Object in JMS message is not a Serializeable", e3);
-		}
-		catch (ParserConfigurationException e4)
-		{
-			_logger.error("Object in JMS message has invalid XML", e4);
-		}
-		catch (SAXException e5)
-		{
-			_logger.error("Object in JMS message has invalid XML", e5);
-		}
-		return null;
+        try {
+    		if (!_isReceiver)
+    			throw new CourierException("This is an outgoing-only Courier");
+    		if (millis < 1)
+    			throw new IllegalArgumentException("Timeout millis must be > 0");
+    		if (null == _messageConsumer) try
+    		{
+    			createMessageConsumer();
+    		}
+    		catch (Exception e)
+    		{
+    			try
+    			{
+    				Thread.sleep(1000); // TODO magic number
+    			}
+    			catch (InterruptedException eI)
+    			{/* OK do nothing */
+    			}
+    			throw new CourierException("Unable to create Message Consumer", e);
+    		}
+    
+    		javax.jms.Message jmsMessage = null;
+    		while (null != _messageConsumer)
+    		{
+    			try
+    			{
+    				jmsMessage = _messageConsumer.receive(millis);
+    				break;
+    			}
+    			catch (JMSException e)
+    			{
+    				jmsConnectRetry(e);
+    			}
+    			catch (Exception e)
+    			{
+    				throw new CourierException(e);
+    			}
+    		}
+    		if (null == jmsMessage) return null;
+    
+    		if (!(jmsMessage instanceof ObjectMessage))
+    		{
+    			_logger.error("Unsupported JMS message type: " + jmsMessage
+    					.getClass().getName());
+    			return null;
+    		}
+    		try
+    		{
+    			Serializable obj = (Serializable) ((ObjectMessage) jmsMessage)
+    					.getObject();
+    			return Util.deserialize(obj);
+    		}
+    		catch (JMSException e1)
+    		{
+    			_logger.error("Failed to read Serialized Object from JMS message.",
+    					e1);
+    			return null;
+    		}
+    		catch (ClassCastException e2)
+    		{
+    			_logger
+    					.error(
+    							"Object in JMS message is not a org.jboss.soa.esb.message.Message",
+    							e2);
+    		}
+    		catch (IOException e3)
+    		{
+    			_logger.error("Object in JMS message is not a Serializeable", e3);
+    		}
+    		catch (ParserConfigurationException e4)
+    		{
+    			_logger.error("Object in JMS message has invalid XML", e4);
+    		}
+    		catch (SAXException e5)
+    		{
+    			_logger.error("Object in JMS message has invalid XML", e5);
+    		}
+    		return null;
+        } finally {
+            cleanup();
+        }
 	} // ________________________________
 
 	private void createMessageConsumer () throws CourierException, ConfigurationException, MalformedEPRException
@@ -422,56 +400,40 @@
 			if (null == oJndiCtx)
 				throw new ConfigurationException(
 						"Unable fo obtain jndi context <" + sJndiURL + "," + sJndiConnectionFactory + "," + sJndiPkgPrefix + ">");
-	
+			
 			String sFactoryClass = _epr.getConnectionFactory();
 			if (Util.isNullString(sFactoryClass))
 				sFactoryClass = "ConnectionFactory";
+            
+            String sType = _epr.getDestinationType();
+            _pool = JmsConnectionPoolContainer.getPool(sJndiURL, sJndiConnectionFactory, sJndiPkgPrefix, sFactoryClass, sType);
 	
-			Object tmp = oJndiCtx.lookup(sFactoryClass);
-	
-			String sType = _epr.getDestinationType();
-			if (JMSEpr.QUEUE_TYPE.equals(sType))
-			{
-				QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
-				QueueConnection qConn = qcf.createQueueConnection();
-				QueueSession qSess = qConn.createQueueSession(false,
-						QueueSession.AUTO_ACKNOWLEDGE);
+			if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+				QueueSession qSess = _pool.getQueueSession();
+                _jmsSession = qSess;
 				javax.jms.Queue queue = null;
-				try
-				{
+				try {
 					queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
 							.getDestinationName());
-				}
-				catch (NamingException ne)
-				{
+				} catch (NamingException ne) {
 					queue = qSess.createQueue(_epr.getDestinationName());
 				}
-				_jmsConnection = qConn;
-				_jmsSession = qSess;
 				_messageConsumer = qSess.createReceiver(queue, _epr
 						.getMessageSelector());
-				qConn.start();
-			}
-			else
-			{
-				if (JMSEpr.TOPIC_TYPE.equals(sType))
-				{
-					TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
-					TopicConnection tConn = tcf.createTopicConnection();
-					TopicSession tSess = tConn.createTopicSession(false,
-							TopicSession.AUTO_ACKNOWLEDGE);
+			} else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+					TopicSession tSess = _pool.getTopicSession();
+                    _jmsSession = tSess;
 					Topic topic = tSess.createTopic(_epr.getDestinationName());
-					_jmsConnection = tConn;
-					_jmsSession = tSess;
 					_messageConsumer = tSess.createConsumer(topic, _epr
 							.getMessageSelector());
-					tConn.start();
-				}
-				else
-					throw new CourierException("Unknown destination type");
-			}
-			setConnExceptionListener();
+			} else {
+				throw new CourierException("Unknown destination type");
+            }
 		}
+        catch (ConnectionException cex)
+        {
+            throw new CourierException(cex);
+        }
 		catch (JMSException ex)
 		{
 			_logger.error("Error from JMS system.", ex);
@@ -498,19 +460,6 @@
         }
 
 	} // ________________________________
-	
-	protected void setConnExceptionListener() throws JMSException
-	{
-		_jmsConnection.setExceptionListener
-		(new ExceptionListener()
-		{
-			public void onException(JMSException arg0) 
-			{
-				cleanup();
-			}
-			
-		});
-	} //________________________________
 
 
 	long _sleepForRetries = 3000; // milliseconds
@@ -523,8 +472,6 @@
 
 	protected String _messageSelector;
 
-	protected Connection _jmsConnection;
-
 	protected Session _jmsSession;
 
 	protected MessageProducer _messageProducer;
@@ -532,5 +479,7 @@
 	protected MessageConsumer _messageConsumer;
 
 	protected List<KeyValuePair> _messageProperties;
+    
+    protected JmsConnectionPool _pool;
         
 }

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2007-03-16 22:27:37 UTC (rev 10290)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2007-03-16 22:37:12 UTC (rev 10291)
@@ -52,7 +52,7 @@
 public class JmsConnectionPool
 {
     /** Maximum number of Sessions that will be created in this pool */
-    private int MAX_SESSIONS=10;    //TODO Make this manageable
+    private int MAX_SESSIONS=20;    //TODO Make this manageable
     /** Number of free sessions in the pool that can be given out */
     private ArrayList<Session> freeSessions = new ArrayList<Session>();
     /** Number of session that are currently in use */
@@ -100,6 +100,7 @@
                 TopicConnectionFactory factory = (TopicConnectionFactory) factoryConnection;
                 jmsConnection = factory.createTopicConnection();
             }
+            jmsConnection.start();
             jndiContext.close(); //TODO Make sure this is ok for MQSeries.
         }
         //Create a new Session
@@ -207,6 +208,7 @@
         }
         logger.debug("Emptied the session pool now closing the connection to the factory.");
         if (jmsConnection!=null) {
+            jmsConnection.stop();
             jmsConnection.close();
             jmsConnection=null;
         }




More information about the jboss-svn-commits mailing list