[jboss-svn-commits] JBL Code SVN: r10291 - in labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb: rosetta/pooling and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Mar 16 18:37:12 EDT 2007
Author: kurt.stam at jboss.com
Date: 2007-03-16 18:37:12 -0400 (Fri, 16 Mar 2007)
New Revision: 10291
Modified:
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
Log:
Hooking up the JMS Connection Pool in the JMSCourier.
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2007-03-16 22:27:37 UTC (rev 10290)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2007-03-16 22:37:12 UTC (rev 10291)
@@ -27,19 +27,13 @@
import java.net.URISyntaxException;
import java.util.List;
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
@@ -47,6 +41,9 @@
import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -115,28 +112,14 @@
_logger.error(e.getMessage(), e);
}
- if (null != _jmsSession) try
+ if (null != _jmsSession)
{
- _jmsSession.close();
+ _pool.closeSession(_jmsSession);
}
- catch (JMSException e)
- {
- _logger.error(e.getMessage(), e);
- }
- if (null != _jmsConnection) try
- {
- _jmsConnection.close();
- }
- catch (JMSException e)
- {
- _logger.error(e.getMessage(), e);
- }
-
_messageProducer = null;
_messageConsumer = null;
_jmsSession = null;
- _jmsConnection = null;
} // ________________________________
/**
@@ -150,42 +133,46 @@
*/
public boolean deliver (Message message) throws CourierException
{
- if (_isReceiver)
- throw new CourierException("This is a read-only Courier");
-
- if (null == message) return false;
- if (null == _messageProducer) try
- {
- createMessageProducer();
- }
- catch (Exception e)
- {
- throw new CourierException(e);
- }
-
- while (null != _messageProducer)
- {
- try
- {
- // obtain Serializable version of arg0 and package it in a jms
- // ObjectMessage
- ObjectMessage msg = _jmsSession.createObjectMessage(Util
- .serialize(message));
- for (KeyValuePair kvp : _messageProperties)
- msg.setStringProperty(kvp.getKey(), kvp.getValue());
- sendMessage(msg);
- return true;
- }
- catch (JMSException e)
- {
- jmsConnectRetry(e);
- }
- catch (Exception e)
- {
- throw new CourierException(e);
- }
- }
- return false;
+ try {
+ if (_isReceiver)
+ throw new CourierException("This is a read-only Courier");
+
+ if (null == message) return false;
+ if (null == _messageProducer) try
+ {
+ createMessageProducer();
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+
+ while (null != _messageProducer)
+ {
+ try
+ {
+ // obtain Serializable version of arg0 and package it in a jms
+ // ObjectMessage
+ ObjectMessage msg = _jmsSession.createObjectMessage(Util
+ .serialize(message));
+ for (KeyValuePair kvp : _messageProperties)
+ msg.setStringProperty(kvp.getKey(), kvp.getValue());
+ sendMessage(msg);
+ return true;
+ }
+ catch (JMSException e)
+ {
+ jmsConnectRetry(e);
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ }
+ return false;
+ } finally {
+ cleanup();
+ }
} // ________________________________
/**
@@ -205,7 +192,6 @@
private void jmsConnectRetry (Exception exc)
{
_logger.error("JMS error. Attempting JMS reconnect.", exc);
- _jmsConnection = null;
_jmsSession = null;
_messageProducer = null;
_messageConsumer = null;
@@ -251,54 +237,42 @@
if (Util.isNullString(sFactoryClass))
sFactoryClass = "ConnectionFactory";
- Object tmp = oJndiCtx.lookup(sFactoryClass);
-
String sType = _epr.getDestinationType();
- if (JMSEpr.QUEUE_TYPE.equals(sType))
- {
- QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
- QueueConnection qConn = qcf.createQueueConnection();
- QueueSession qSess = qConn.createQueueSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
+ _pool = JmsConnectionPoolContainer.getPool(sJndiURL, sJndiConnectionFactory, sJndiPkgPrefix, sFactoryClass, sType);
+
+ if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+ QueueSession qSess = _pool.getQueueSession();
+ _jmsSession = qSess;
javax.jms.Queue queue = null;
- try
- {
+ try {
queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
.getDestinationName());
- }
- catch (NamingException ne)
- {
+ } catch (NamingException ne) {
queue = qSess.createQueue(_epr.getDestinationName());
}
- _jmsConnection = qConn;
- _jmsSession = qSess;
_messageProducer = qSess.createSender(queue);
- }
- else
- if (JMSEpr.TOPIC_TYPE.equals(sType))
+ } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+ TopicSession tSess = _pool.getTopicSession();
+ _jmsSession = tSess;
+ Topic topic = null;
+ try
{
- TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
- TopicConnection tConn = tcf.createTopicConnection();
- TopicSession tSess = tConn.createTopicSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
- Topic topic = null;
- try
- {
- topic = (Topic) oJndiCtx.lookup(_epr
- .getDestinationName());
- }
- catch (NamingException ne)
- {
- topic = tSess.createTopic(_epr.getDestinationName());
- }
- _jmsConnection = tConn;
- _jmsSession = tSess;
- _messageProducer = tSess.createPublisher(topic);
+ topic = (Topic) oJndiCtx.lookup(_epr
+ .getDestinationName());
}
- else
- throw new CourierException("Unknown destination type");
- setConnExceptionListener();
+ catch (NamingException ne)
+ {
+ topic = tSess.createTopic(_epr.getDestinationName());
+ }
+ _messageProducer = tSess.createPublisher(topic);
+ } else {
+ throw new CourierException("Unknown destination type");
+ }
}
+ catch (ConnectionException vex)
+ {
+ throw new CourierException(vex);
+ }
catch (JMSException ex)
{
_logger.error("Error from JMS system.", ex);
@@ -330,83 +304,87 @@
public Message pickup (long millis) throws CourierException
{
- if (!_isReceiver)
- throw new CourierException("This is an outgoing-only Courier");
- if (millis < 1)
- throw new IllegalArgumentException("Timeout millis must be > 0");
- if (null == _messageConsumer) try
- {
- createMessageConsumer();
- }
- catch (Exception e)
- {
- try
- {
- Thread.sleep(1000); // TODO magic number
- }
- catch (InterruptedException eI)
- {/* OK do nothing */
- }
- throw new CourierException("Unable to create Message Consumer", e);
- }
-
- javax.jms.Message jmsMessage = null;
- while (null != _messageConsumer)
- {
- try
- {
- jmsMessage = _messageConsumer.receive(millis);
- break;
- }
- catch (JMSException e)
- {
- jmsConnectRetry(e);
- }
- catch (Exception e)
- {
- throw new CourierException(e);
- }
- }
- if (null == jmsMessage) return null;
-
- if (!(jmsMessage instanceof ObjectMessage))
- {
- _logger.error("Unsupported JMS message type: " + jmsMessage
- .getClass().getName());
- return null;
- }
- try
- {
- Serializable obj = (Serializable) ((ObjectMessage) jmsMessage)
- .getObject();
- return Util.deserialize(obj);
- }
- catch (JMSException e1)
- {
- _logger.error("Failed to read Serialized Object from JMS message.",
- e1);
- return null;
- }
- catch (ClassCastException e2)
- {
- _logger
- .error(
- "Object in JMS message is not a org.jboss.soa.esb.message.Message",
- e2);
- }
- catch (IOException e3)
- {
- _logger.error("Object in JMS message is not a Serializeable", e3);
- }
- catch (ParserConfigurationException e4)
- {
- _logger.error("Object in JMS message has invalid XML", e4);
- }
- catch (SAXException e5)
- {
- _logger.error("Object in JMS message has invalid XML", e5);
- }
- return null;
+ try {
+ if (!_isReceiver)
+ throw new CourierException("This is an outgoing-only Courier");
+ if (millis < 1)
+ throw new IllegalArgumentException("Timeout millis must be > 0");
+ if (null == _messageConsumer) try
+ {
+ createMessageConsumer();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ Thread.sleep(1000); // TODO magic number
+ }
+ catch (InterruptedException eI)
+ {/* OK do nothing */
+ }
+ throw new CourierException("Unable to create Message Consumer", e);
+ }
+
+ javax.jms.Message jmsMessage = null;
+ while (null != _messageConsumer)
+ {
+ try
+ {
+ jmsMessage = _messageConsumer.receive(millis);
+ break;
+ }
+ catch (JMSException e)
+ {
+ jmsConnectRetry(e);
+ }
+ catch (Exception e)
+ {
+ throw new CourierException(e);
+ }
+ }
+ if (null == jmsMessage) return null;
+
+ if (!(jmsMessage instanceof ObjectMessage))
+ {
+ _logger.error("Unsupported JMS message type: " + jmsMessage
+ .getClass().getName());
+ return null;
+ }
+ try
+ {
+ Serializable obj = (Serializable) ((ObjectMessage) jmsMessage)
+ .getObject();
+ return Util.deserialize(obj);
+ }
+ catch (JMSException e1)
+ {
+ _logger.error("Failed to read Serialized Object from JMS message.",
+ e1);
+ return null;
+ }
+ catch (ClassCastException e2)
+ {
+ _logger
+ .error(
+ "Object in JMS message is not a org.jboss.soa.esb.message.Message",
+ e2);
+ }
+ catch (IOException e3)
+ {
+ _logger.error("Object in JMS message is not a Serializeable", e3);
+ }
+ catch (ParserConfigurationException e4)
+ {
+ _logger.error("Object in JMS message has invalid XML", e4);
+ }
+ catch (SAXException e5)
+ {
+ _logger.error("Object in JMS message has invalid XML", e5);
+ }
+ return null;
+ } finally {
+ cleanup();
+ }
} // ________________________________
private void createMessageConsumer () throws CourierException, ConfigurationException, MalformedEPRException
@@ -422,56 +400,40 @@
if (null == oJndiCtx)
throw new ConfigurationException(
"Unable fo obtain jndi context <" + sJndiURL + "," + sJndiConnectionFactory + "," + sJndiPkgPrefix + ">");
-
+
String sFactoryClass = _epr.getConnectionFactory();
if (Util.isNullString(sFactoryClass))
sFactoryClass = "ConnectionFactory";
+
+ String sType = _epr.getDestinationType();
+ _pool = JmsConnectionPoolContainer.getPool(sJndiURL, sJndiConnectionFactory, sJndiPkgPrefix, sFactoryClass, sType);
- Object tmp = oJndiCtx.lookup(sFactoryClass);
-
- String sType = _epr.getDestinationType();
- if (JMSEpr.QUEUE_TYPE.equals(sType))
- {
- QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
- QueueConnection qConn = qcf.createQueueConnection();
- QueueSession qSess = qConn.createQueueSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
+ if (JMSEpr.QUEUE_TYPE.equals(sType)) {
+ QueueSession qSess = _pool.getQueueSession();
+ _jmsSession = qSess;
javax.jms.Queue queue = null;
- try
- {
+ try {
queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
.getDestinationName());
- }
- catch (NamingException ne)
- {
+ } catch (NamingException ne) {
queue = qSess.createQueue(_epr.getDestinationName());
}
- _jmsConnection = qConn;
- _jmsSession = qSess;
_messageConsumer = qSess.createReceiver(queue, _epr
.getMessageSelector());
- qConn.start();
- }
- else
- {
- if (JMSEpr.TOPIC_TYPE.equals(sType))
- {
- TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
- TopicConnection tConn = tcf.createTopicConnection();
- TopicSession tSess = tConn.createTopicSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
+ } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+ TopicSession tSess = _pool.getTopicSession();
+ _jmsSession = tSess;
Topic topic = tSess.createTopic(_epr.getDestinationName());
- _jmsConnection = tConn;
- _jmsSession = tSess;
_messageConsumer = tSess.createConsumer(topic, _epr
.getMessageSelector());
- tConn.start();
- }
- else
- throw new CourierException("Unknown destination type");
- }
- setConnExceptionListener();
+ } else {
+ throw new CourierException("Unknown destination type");
+ }
}
+ catch (ConnectionException cex)
+ {
+ throw new CourierException(cex);
+ }
catch (JMSException ex)
{
_logger.error("Error from JMS system.", ex);
@@ -498,19 +460,6 @@
}
} // ________________________________
-
- protected void setConnExceptionListener() throws JMSException
- {
- _jmsConnection.setExceptionListener
- (new ExceptionListener()
- {
- public void onException(JMSException arg0)
- {
- cleanup();
- }
-
- });
- } //________________________________
long _sleepForRetries = 3000; // milliseconds
@@ -523,8 +472,6 @@
protected String _messageSelector;
- protected Connection _jmsConnection;
-
protected Session _jmsSession;
protected MessageProducer _messageProducer;
@@ -532,5 +479,7 @@
protected MessageConsumer _messageConsumer;
protected List<KeyValuePair> _messageProperties;
+
+ protected JmsConnectionPool _pool;
}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2007-03-16 22:27:37 UTC (rev 10290)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2007-03-16 22:37:12 UTC (rev 10291)
@@ -52,7 +52,7 @@
public class JmsConnectionPool
{
/** Maximum number of Sessions that will be created in this pool */
- private int MAX_SESSIONS=10; //TODO Make this manageable
+ private int MAX_SESSIONS=20; //TODO Make this manageable
/** Number of free sessions in the pool that can be given out */
private ArrayList<Session> freeSessions = new ArrayList<Session>();
/** Number of session that are currently in use */
@@ -100,6 +100,7 @@
TopicConnectionFactory factory = (TopicConnectionFactory) factoryConnection;
jmsConnection = factory.createTopicConnection();
}
+ jmsConnection.start();
jndiContext.close(); //TODO Make sure this is ok for MQSeries.
}
//Create a new Session
@@ -207,6 +208,7 @@
}
logger.debug("Emptied the session pool now closing the connection to the factory.");
if (jmsConnection!=null) {
+ jmsConnection.stop();
jmsConnection.close();
jmsConnection=null;
}
More information about the jboss-svn-commits
mailing list