[jboss-svn-commits] JBL Code SVN: r7514 - labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 9 13:50:29 EST 2006
Author: estebanschifman
Date: 2006-11-09 13:50:27 -0500 (Thu, 09 Nov 2006)
New Revision: 7514
Modified:
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
Log:
Implement receiver part of JmsCourier
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2006-11-09 18:41:32 UTC (rev 7513)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2006-11-09 18:50:27 UTC (rev 7514)
@@ -22,6 +22,8 @@
package org.jboss.internal.soa.esb.couriers;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import javax.jms.Connection;
@@ -39,6 +41,7 @@
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
+import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -47,6 +50,7 @@
import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
public class JmsCourier implements PickUpCourier, DeliverCourier
{
@@ -71,11 +75,9 @@
_epr = epr;
_sleepForRetries = 3000;
- try
- {
- _messageProperties = Util.propertiesFromSelector(_epr.getMessageSelector());
- }
- catch (Exception e) { throw new CourierException(e); }
+ if (! _isReceiver)
+ try { _messageProperties = Util.propertiesFromSelector(_epr.getMessageSelector()); }
+ catch (Exception e) { throw new CourierException(e); }
} //________________________________
@@ -84,22 +86,23 @@
if (null != _messageProducer)
try { _messageProducer.close(); }
catch (JMSException e) {/* OK do nothing */ }
- _messageProducer = null;
if (null != _messageConsumer)
try { _messageConsumer.close(); }
catch (JMSException e) {/* OK do nothing */ }
- _messageConsumer = null;
if (null != _jmsSession)
try { _jmsSession.close(); }
catch (JMSException e) {/* OK do nothing */ }
- _jmsSession = null;
if (null != _jmsConnection)
try { _jmsConnection.close(); }
catch (JMSException e) {/* OK do nothing */ }
- _jmsConnection = null;
+
+ _messageProducer = null;
+ _messageConsumer = null;
+ _jmsSession = null;
+ _jmsConnection = null;
} //________________________________
/**
@@ -119,7 +122,7 @@
try { createMessageProducer();}
catch (Exception e) {throw new CourierException(e); }
- while (null!=_messageProducer)
+ while (null!=_messageProducer)
{
try
{
@@ -130,20 +133,12 @@
sendMessage(msg);
return true;
}
- catch (JMSException e) { jmsRetryDeliver(e); }
+ catch (JMSException e) { jmsConnectRetry(e); }
catch (Exception e) { throw new CourierException(e);}
}
return false;
} //________________________________
- public Message pickUp(long millis) throws CourierException
- {
- if (! _isReceiver)
- throw new CourierException("This is an outgoing-only Courier");
- // TODO Auto-generated method stub
- return null;
- }
-
/**
* send/publish a javax.jms.ObjectMessage (that will contain the serialized ESB Message)
* @param jmsMessage
@@ -156,17 +151,24 @@
_messageProducer.send(jmsMessage);
} //________________________________
- private void jmsRetryDeliver(Exception exc)
+ private void jmsConnectRetry(Exception exc)
{
_logger.error("JMS error. Attempting JMS reconnect.", exc);
_jmsConnection = null;
_jmsSession = null;
_messageProducer= null;
+ _messageConsumer= null;
for (int i1 = 0; i1 < 5; i1++)
{
// try to reconnect to the queue
- try { createMessageProducer(); }
+ try
+ {
+ if (_isReceiver)
+ createMessageConsumer();
+ else
+ createMessageProducer();
+ }
catch (Exception e)
{
_logger.error("Reconnecting to JMS", e);
@@ -182,7 +184,6 @@
protected void createMessageProducer() throws Exception
{
-
String sJndiType = _epr.getJndiType();
if (Util.isNullString(sJndiType))
sJndiType = "jboss";
@@ -206,7 +207,7 @@
QueueSession qSess = qConn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
_jmsConnection = qConn;
_jmsSession = qSess;
- _messageProducer = qSess.createProducer(queue);
+ _messageProducer = qSess.createSender(queue);
}
else if(JMSEpr.TOPIC_TYPE.equals(sType))
{
@@ -223,7 +224,107 @@
} //________________________________
- protected boolean _isReceiver;
+ public Message pickUp(long millis) throws CourierException
+ {
+ if (! _isReceiver)
+ throw new CourierException("This is an outgoing-only Courier");
+ if (millis < 1)
+ throw new IllegalArgumentException("Timeout millis must be > 0");
+ if (null==_messageConsumer)
+ try { createMessageConsumer();}
+ catch (Exception e) {throw new CourierException(e); }
+
+ javax.jms.Message jmsMessage = null;
+ while (null!=_messageConsumer)
+ {
+ try
+ {
+ jmsMessage = _messageConsumer.receive(millis);
+ break;
+ }
+ catch (JMSException e) { jmsConnectRetry(e); }
+ catch (Exception e) { throw new CourierException(e);}
+ }
+ if (null == jmsMessage)
+ return null;
+
+ if (!(jmsMessage instanceof ObjectMessage))
+ {
+ _logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+ return null;
+ }
+ try
+ {
+ Serializable obj = (Serializable)((ObjectMessage)jmsMessage).getObject();
+ return Util.deserialize(obj);
+ }
+ catch (JMSException e1)
+ { _logger.error("Failed to read Serialized Object from JMS message.", e1);
+ return null;
+ }
+ catch (ClassCastException e2)
+ { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
+ }
+ catch (IOException e3)
+ { _logger.error("Object in JMS message is not a Serializeable", e3);
+ }
+ catch (ParserConfigurationException e4)
+ { _logger.error("Object in JMS message has invalid XML", e4);
+ }
+ catch (SAXException e5)
+ { _logger.error("Object in JMS message has invalid XML", e5);
+ }
+ return null;
+ } //________________________________
+
+ protected void createMessageConsumer() throws Exception
+ {
+ String sJndiType = _epr.getJndiType();
+ if (Util.isNullString(sJndiType))
+ sJndiType = "jboss";
+ String sJndiURL = _epr.getJndiURL();
+ if (Util.isNullString(sJndiURL))
+ sJndiURL = "localhost";
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+ String sFactoryClass = _epr.getConnectionFactory();
+ if (Util.isNullString(sFactoryClass))
+ sFactoryClass = "ConnectionFactory";
+
+ Object tmp = oJndiCtx.lookup(sFactoryClass);
+
+ String sType = _epr.getDestinationType();
+ if (JMSEpr.QUEUE_TYPE.equals(sType))
+ {
+ javax.jms.Queue queue = (javax.jms.Queue) oJndiCtx.lookup(_epr.getDestinationName());
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+ QueueConnection qConn = qcf.createQueueConnection();
+ QueueSession qSess = qConn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
+ _jmsConnection = qConn;
+ _jmsSession = qSess;
+ _messageConsumer = qSess.createReceiver(queue,_epr.getMessageSelector());
+ qConn.start();
+ }
+ else if(JMSEpr.TOPIC_TYPE.equals(sType))
+ {
+ Topic topic = (Topic) oJndiCtx.lookup(_epr.getDestinationName());
+ TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+ TopicConnection tConn = tcf.createTopicConnection();
+ TopicSession tSess = tConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
+ _jmsConnection = tConn;
+ _jmsSession = tSess;
+ _messageConsumer = tSess.createConsumer(topic,_epr.getMessageSelector());
+ tConn.start();
+ }
+ else
+ throw new CourierException("Unknown destination type");
+
+
+ } //________________________________
+
+ long _sleepForRetries = 3000; // milliseconds
+
+ protected boolean _isReceiver;
protected JMSEpr _epr;
protected Logger _logger = Logger.getLogger(JmsCourier.class);
protected String _messageSelector;
@@ -231,6 +332,5 @@
protected Session _jmsSession;
protected MessageProducer _messageProducer;
protected MessageConsumer _messageConsumer;
- protected long _sleepForRetries = 3000; // milliseconds
protected List<KeyValuePair> _messageProperties;
}
More information about the jboss-svn-commits
mailing list