[jboss-svn-commits] JBL Code SVN: r7390 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb: actions listeners/message

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Nov 3 17:27:56 EST 2006


Author: estebanschifman
Date: 2006-11-03 17:27:53 -0500 (Fri, 03 Nov 2006)
New Revision: 7390

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java
Log:
Add logic to receive response from router

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java	2006-11-03 21:14:09 UTC (rev 7389)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java	2006-11-03 22:27:53 UTC (rev 7390)
@@ -22,23 +22,40 @@
 
 package org.jboss.soa.esb.actions;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Iterator;
 
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.naming.Context;
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
-import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.couriers.Courier;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.notification.*;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.message.CbrJmsQueueListener;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.notification.NotificationList;
 import org.jboss.soa.esb.services.registry.Registry;
-import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.services.registry.RegistryFactory;
 import org.jboss.soa.esb.services.routing.MessageRouter;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
 
 
 /**
@@ -49,42 +66,60 @@
  */
 public class CbrProxyAction
 {
+	
 
 	private static Logger _logger = Logger.getLogger(CbrProxyAction.class);
 
-	protected Message 		_message; //Why is this a class variable?
 	protected ConfigTree	_config;
 	
-    public CbrProxyAction(ConfigTree config) { _config = config; } 
+    public CbrProxyAction(ConfigTree config) throws Exception 
+    { 	
+    	_config = config; 
+		Registry registry = RegistryFactory.getRegistry(); 
+		String serviceCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+		String serviceName         = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+		_eprs = registry.findEPRs(serviceCategoryName, serviceName);
+		_lSleepForRetry = 500;
+    } 
     public Message noOperation(Message message) { return message; } 
     
     public Message process(Message message) 
     {
-    	_message = message;
+        _logger.info("Process was called.");
+
+    	// get a real UID into replyId
+    	String replyId = Long.toString(System.currentTimeMillis());
+    	message.getBody().add(CbrJmsQueueListener.CORRELATION_ID_TAG, replyId);
+
     	try {
-	    	//Can we make this static for optimization?
-	    	Registry registry = RegistryFactory.getRegistry(); 
-	    	String serviceCategoryName = _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
-	    	String serviceName         = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-			Collection<EPR> eprs = registry.findEPRs(serviceCategoryName, serviceName);
-			for (Iterator<EPR> eprIterator=eprs.iterator();eprIterator.hasNext();){
+			for (Iterator<EPR> eprIterator=_eprs.iterator();eprIterator.hasNext();){
 				//Just use the first EPR in the list.
 				EPR epr = eprIterator.next();
 				Courier courier = CourierFactory.getCourier(epr);
 				//If not successful try the next EPR
-				if (courier.deliver(message)) {
-					break;
+				if (courier.deliver(message)) 
+				{
+					return getResponse(replyId,epr);
 				}
 			}
-    	} catch (RegistryException re) {
-    		_logger.error("Could not access the registry. " + re.getLocalizedMessage(), re);
     	} catch (CourierException ce) {
     		_logger.error("Could not send to the CBR. " + ce.getLocalizedMessage(), ce);
     	}
-        _logger.info("Process was called.");
         return message;
     } // ________________________________
+    
+    private Message getResponse(String replyId, EPR epr)
+    {
+    	// For now, this proxy only works for a JMS epr
+    	if (! (epr instanceof JMSEpr))
+    		return null;
 
+    	StringBuilder sb = new StringBuilder()
+    		.append(CbrJmsQueueListener.CORRELATION_ID_TAG).append("='")
+    		.append(replyId).append("'")
+    		;
+    	return receiveResponse((JMSEpr)epr, sb.toString(), 5000);
+    }
     
     public Message addDestinationListToMessage(Message message) {
     	return null;
@@ -101,7 +136,7 @@
     public void exceptionCallback(Message message, Throwable t)
     {
     	String sMsg = new StringBuilder(" ExceptionTrower.exceptionCallback CALLED ")
-    	.append(_message)
+    	.append(message)
     	.toString();
     	_logger.fatal(sMsg,t);
     	@SuppressWarnings("unused") 
@@ -117,5 +152,113 @@
     	_logger.log(Priority.INFO, destinationServices);
     	
     }
+
+    private MessageConsumer getConsumer(JMSEpr epr, String selector) 
+    {
+        _Qconn = null;
+        _Qsess = null;
+        _Queue = null;
+
+        Exception thrown = null;
+        try
+        {
+            Context context = AppServerContext.getServerContext(epr.getJndiType(),epr.getJndiURL());
+        	Object tmp = context.lookup(epr.getConnectionFactory());
+        	QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+        	_Qconn = qcf.createQueueConnection();
+        	_Queue = (Queue) context.lookup(epr.getDestinationName());
+        	_Qsess = _Qconn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
+        	_Qconn.start();
+        	return _Qsess.createReceiver(_Queue, selector);
+        }
+        catch (URISyntaxException e)			{ thrown = e; }
+        catch (javax.naming.NamingException e)	{ thrown = e; }
+        catch (JMSException e) 					{ thrown = e; }
+        _logger.error("Unable to create response consumer",thrown);
+        return null;
+
+    } // ________________________________
+
+    protected Message receiveResponse(JMSEpr epr, String selector, long millis)
+    {
+    	MessageConsumer consumer = null;
+    	javax.jms.Message jmsMessage = null;
+        try 
+		{	
+        	consumer = getConsumer(epr, selector);
+        	jmsMessage = consumer.receive(millis);
+		}
+        catch (JMSException oJ)
+        {
+        	for (int i1 = 0; i1 < 3; i1++)
+        		// try to reconnect to the queue
+        		try 
+        		{ 
+        			consumer = getConsumer(epr, selector); 
+                	jmsMessage = consumer.receive(millis);
+        		} 
+        		catch (Exception e)
+        		{
+                	_logger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
+        			_logger.error("Reconnecting to Queue", e);
+        			try {	Thread.sleep(_lSleepForRetry); }
+        			catch (InterruptedException e1)
+        			{ // Just return after logging
+        				_logger.error("Unexpected thread interupt exception.", e);
+        				return null;
+                    }
+                 }
+        }
+        finally {cleanupJms(); }
+
+        if (null == jmsMessage)
+        	return null;
+
+        if (!(jmsMessage instanceof ObjectMessage))
+        {
+        	_logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+        	return null;
+        }
+        try
+        {
+        	Serializable obj = (Serializable)((ObjectMessage)jmsMessage).getObject();
+        	return Util.deserialize(obj);
+        } 
+        catch (JMSException e1)
+        { _logger.error("Failed to read Serialized Object from JMS message.", e1);
+          return null;
+        }
+        catch (ClassCastException e2)
+        { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
+        }
+        catch (IOException e3)
+        { _logger.error("Object in JMS message is not a Serializeable", e3);
+        }
+        catch (ParserConfigurationException e4)
+        { _logger.error("Object in JMS message has invalid XML", e4);
+        }
+        catch (SAXException e5)
+        { _logger.error("Object in JMS message has invalid XML", e5);
+        }
+    	return null;
+    }
     
+
+    protected void cleanupJms() 
+    {
+        if (null != _Qsess)
+            try { _Qsess.close(); }
+            catch (Exception e1) {/* Tried my best - Just continue */ }
+        if (null != _Qconn)
+            try { _Qconn.close(); }
+        	catch (Exception e2) {/* Tried my best - Just continue */ }
+    }
+    
+    protected Collection<EPR> _eprs;
+    protected long			_lSleepForRetry;
+    protected QueueConnection _Qconn = null;
+    protected QueueSession 	_Qsess = null;
+    protected Queue			_Queue = null;
+    
 } // ____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java	2006-11-03 21:14:09 UTC (rev 7389)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListener.java	2006-11-03 22:27:53 UTC (rev 7390)
@@ -54,6 +54,8 @@
 
 public class CbrJmsQueueListener extends JmsQueueListener 
 {
+	public static final String CORRELATION_ID_TAG = "jmsCorrelationId";
+	
 	private static transient Logger _logger = Logger.getLogger(CbrJmsQueueListener.class);
 	
 	public CbrJmsQueueListener(EsbListenerController controller, ConfigTree config) throws ConfigurationException {




More information about the jboss-svn-commits mailing list