[jboss-svn-commits] JBL Code SVN: r7390 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb: actions listeners/message
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Nov 3 17:27:56 EST 2006
Author: estebanschifman
Date: 2006-11-03 17:27:53 -0500 (Fri, 03 Nov 2006)
New Revision: 7390
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java
Log:
Add logic to receive response from router
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java 2006-11-03 21:14:09 UTC (rev 7389)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java 2006-11-03 22:27:53 UTC (rev 7390)
@@ -22,23 +22,40 @@
package org.jboss.soa.esb.actions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.naming.Context;
+import javax.xml.parsers.ParserConfigurationException;
+
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
-import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.notification.*;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.message.CbrJmsQueueListener;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.notification.NotificationList;
import org.jboss.soa.esb.services.registry.Registry;
-import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.services.registry.RegistryFactory;
import org.jboss.soa.esb.services.routing.MessageRouter;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
/**
@@ -49,42 +66,60 @@
*/
public class CbrProxyAction
{
+
private static Logger _logger = Logger.getLogger(CbrProxyAction.class);
- protected Message _message; //Why is this a class variable?
protected ConfigTree _config;
- public CbrProxyAction(ConfigTree config) { _config = config; }
+ public CbrProxyAction(ConfigTree config) throws Exception
+ {
+ _config = config;
+ Registry registry = RegistryFactory.getRegistry();
+ String serviceCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ String serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ _eprs = registry.findEPRs(serviceCategoryName, serviceName);
+ _lSleepForRetry = 500;
+ }
public Message noOperation(Message message) { return message; }
public Message process(Message message)
{
- _message = message;
+ _logger.info("Process was called.");
+
+ // get a real UID into replyId
+ String replyId = Long.toString(System.currentTimeMillis());
+ message.getBody().add(CbrJmsQueueListener.CORRELATION_ID_TAG, replyId);
+
try {
- //Can we make this static for optimization?
- Registry registry = RegistryFactory.getRegistry();
- String serviceCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- String serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- Collection<EPR> eprs = registry.findEPRs(serviceCategoryName, serviceName);
- for (Iterator<EPR> eprIterator=eprs.iterator();eprIterator.hasNext();){
+ for (Iterator<EPR> eprIterator=_eprs.iterator();eprIterator.hasNext();){
//Just use the first EPR in the list.
EPR epr = eprIterator.next();
Courier courier = CourierFactory.getCourier(epr);
//If not successful try the next EPR
- if (courier.deliver(message)) {
- break;
+ if (courier.deliver(message))
+ {
+ return getResponse(replyId,epr);
}
}
- } catch (RegistryException re) {
- _logger.error("Could not access the registry. " + re.getLocalizedMessage(), re);
} catch (CourierException ce) {
_logger.error("Could not send to the CBR. " + ce.getLocalizedMessage(), ce);
}
- _logger.info("Process was called.");
return message;
} // ________________________________
+
+ private Message getResponse(String replyId, EPR epr)
+ {
+ // For now, this proxy only works for a JMS epr
+ if (! (epr instanceof JMSEpr))
+ return null;
+ StringBuilder sb = new StringBuilder()
+ .append(CbrJmsQueueListener.CORRELATION_ID_TAG).append("='")
+ .append(replyId).append("'")
+ ;
+ return receiveResponse((JMSEpr)epr, sb.toString(), 5000);
+ }
public Message addDestinationListToMessage(Message message) {
return null;
@@ -101,7 +136,7 @@
public void exceptionCallback(Message message, Throwable t)
{
String sMsg = new StringBuilder(" ExceptionTrower.exceptionCallback CALLED ")
- .append(_message)
+ .append(message)
.toString();
_logger.fatal(sMsg,t);
@SuppressWarnings("unused")
@@ -117,5 +152,113 @@
_logger.log(Priority.INFO, destinationServices);
}
+
+ private MessageConsumer getConsumer(JMSEpr epr, String selector)
+ {
+ _Qconn = null;
+ _Qsess = null;
+ _Queue = null;
+
+ Exception thrown = null;
+ try
+ {
+ Context context = AppServerContext.getServerContext(epr.getJndiType(),epr.getJndiURL());
+ Object tmp = context.lookup(epr.getConnectionFactory());
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ _Qconn = qcf.createQueueConnection();
+ _Queue = (Queue) context.lookup(epr.getDestinationName());
+ _Qsess = _Qconn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
+ _Qconn.start();
+ return _Qsess.createReceiver(_Queue, selector);
+ }
+ catch (URISyntaxException e) { thrown = e; }
+ catch (javax.naming.NamingException e) { thrown = e; }
+ catch (JMSException e) { thrown = e; }
+ _logger.error("Unable to create response consumer",thrown);
+ return null;
+
+ } // ________________________________
+
+ protected Message receiveResponse(JMSEpr epr, String selector, long millis)
+ {
+ MessageConsumer consumer = null;
+ javax.jms.Message jmsMessage = null;
+ try
+ {
+ consumer = getConsumer(epr, selector);
+ jmsMessage = consumer.receive(millis);
+ }
+ catch (JMSException oJ)
+ {
+ for (int i1 = 0; i1 < 3; i1++)
+ // try to reconnect to the queue
+ try
+ {
+ consumer = getConsumer(epr, selector);
+ jmsMessage = consumer.receive(millis);
+ }
+ catch (Exception e)
+ {
+ _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
+ _logger.error("Reconnecting to Queue", e);
+ try { Thread.sleep(_lSleepForRetry); }
+ catch (InterruptedException e1)
+ { // Just return after logging
+ _logger.error("Unexpected thread interupt exception.", e);
+ return null;
+ }
+ }
+ }
+ finally {cleanupJms(); }
+
+ if (null == jmsMessage)
+ return null;
+
+ if (!(jmsMessage instanceof ObjectMessage))
+ {
+ _logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+ return null;
+ }
+ try
+ {
+ Serializable obj = (Serializable)((ObjectMessage)jmsMessage).getObject();
+ return Util.deserialize(obj);
+ }
+ catch (JMSException e1)
+ { _logger.error("Failed to read Serialized Object from JMS message.", e1);
+ return null;
+ }
+ catch (ClassCastException e2)
+ { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
+ }
+ catch (IOException e3)
+ { _logger.error("Object in JMS message is not a Serializeable", e3);
+ }
+ catch (ParserConfigurationException e4)
+ { _logger.error("Object in JMS message has invalid XML", e4);
+ }
+ catch (SAXException e5)
+ { _logger.error("Object in JMS message has invalid XML", e5);
+ }
+ return null;
+ }
+
+ protected void cleanupJms()
+ {
+ if (null != _Qsess)
+ try { _Qsess.close(); }
+ catch (Exception e1) {/* Tried my best - Just continue */ }
+ if (null != _Qconn)
+ try { _Qconn.close(); }
+ catch (Exception e2) {/* Tried my best - Just continue */ }
+ }
+
+ protected Collection<EPR> _eprs;
+ protected long _lSleepForRetry;
+ protected QueueConnection _Qconn = null;
+ protected QueueSession _Qsess = null;
+ protected Queue _Queue = null;
+
} // ____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java 2006-11-03 21:14:09 UTC (rev 7389)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java 2006-11-03 22:27:53 UTC (rev 7390)
@@ -54,6 +54,8 @@
public class CbrJmsQueueListener extends JmsQueueListener
{
+ public static final String CORRELATION_ID_TAG = "jmsCorrelationId";
+
private static transient Logger _logger = Logger.getLogger(CbrJmsQueueListener.class);
public CbrJmsQueueListener(EsbListenerController controller, ConfigTree config) throws ConfigurationException {
More information about the jboss-svn-commits
mailing list