[jboss-svn-commits] JBL Code SVN: r7455 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb/actions listeners/src/org/jboss/soa/esb/listeners listeners/src/org/jboss/soa/esb/listeners/message services/src/org/jboss/internal/soa/esb/services/routing/cbr services/src/org/jboss/soa/esb/services/routing services/src/org/jboss/soa/esb/services/routing/cbr

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Nov 7 18:06:30 EST 2006


Author: kurt.stam at jboss.com
Date: 2006-11-07 18:06:26 -0500 (Tue, 07 Nov 2006)
New Revision: 7455

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java
Log:
Cleaning up CbrProxyAction code.

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java	2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java	2006-11-07 23:06:26 UTC (rev 7455)
@@ -59,6 +59,10 @@
     protected Collection<EPR> _eprs;
     /** Configuration Tree */
 	protected ConfigTree _config;
+	/** Default 1 minute Listener Timeout in millis */
+	final private static long DEFAULT_REPLY_LISTENER_TIMEOUT = 60000;
+	/** replyListenerTimout */
+	private long _replyListenerTimeout = DEFAULT_REPLY_LISTENER_TIMEOUT;
 	
 	/**
 	 * Constructor of the Content Based Router Proxy. This constructor obtains
@@ -69,10 +73,18 @@
 	 */
     public CbrProxyAction(ConfigTree config) throws RegistryException
     { 	
-		Registry registry = RegistryFactory.getRegistry(); 
+		Registry registry          = RegistryFactory.getRegistry(); 
 		String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
 		String serviceName         = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-		_eprs = registry.findEPRs(serviceCategoryName, serviceName);
+		_eprs                      = registry.findEPRs(serviceCategoryName, serviceName);
+		_config                    = config;
+		if (config.getAttribute(ListenerTagNames.REPLY_LISTENER_TIMEOUT_TAG)!=null) {
+			try {
+				_replyListenerTimeout = Long.parseLong(config.getAttribute(ListenerTagNames.REPLY_LISTENER_TIMEOUT_TAG));
+			} catch (NumberFormatException nfe) {
+				_logger.error("Could not parse " + config.getAttribute(ListenerTagNames.REPLY_LISTENER_TIMEOUT_TAG) + " to long.", nfe);
+			}
+		}
     }
     /**
      * No-Op method. This is here for notification purposes.
@@ -97,7 +109,7 @@
      * Just route. The routing returns the message with a Collection of destinationService names.
      * 
      * @param message
-     * @return
+     * @return message, a collection of service destinations is added to the message.
      */
     public Message route(Message message) 
     {
@@ -107,44 +119,37 @@
     	try {
     		
 			for (Iterator<EPR> eprIterator=_eprs.iterator();eprIterator.hasNext();){
-				//Just use the first EPR in the list.
-				EPR epr = eprIterator.next();
-				Courier courier = CourierFactory.getCourier(epr);
-				//If not successful try the next EPR
-				if (epr instanceof JMSEpr) {
-					//Setting the replyTo to the queue we specific in the epr we are going to call
-					//The selectors will do the job.
-					if (message.getHeader().getCall()==null) {
-						Call call = new Call();
-						message.getHeader().setCall(call);
-					}
-
-					JMSEpr jpr = (JMSEpr)epr;
-					JMSEpr replyEpr = null;
-					try
-					{ replyEpr = 
-							new JMSEpr(jpr.getDestinationType()
-									,jpr.getDestinationName()
-									,jpr.getConnectionFactory()
-									,jpr.getJndiType()
-									,jpr.getJndiURL()
-									,jmsQueueReplyListener.getReplySelector()
-									);
-					}
-					catch (URISyntaxException e)
-					{
-						continue;
-					}
-					message.getHeader().getCall().setReplyTo(replyEpr);
-
-					if (courier.deliver(message)) {
-						replyMessage = jmsQueueReplyListener.listen(replyEpr);
-						break;
+				try {
+					//Just use the first EPR in the list.
+					EPR epr = eprIterator.next();
+					Courier courier = CourierFactory.getCourier(epr);
+					//If not successful try the next EPR
+					if (epr instanceof JMSEpr) {
+						//Setting the replyTo to the queue we specific in the epr we are going to call
+						//The selectors will do the job.
+						JMSEpr jpr = (JMSEpr)epr;
+						JMSEpr replyEpr =  new JMSEpr(jpr.getDestinationType()  ,jpr.getDestinationName()
+							,jpr.getConnectionFactory(),jpr.getJndiType(),jpr.getJndiURL()
+							,jmsQueueReplyListener.getReplySelector());
+						if (message.getHeader().getCall()==null) {
+							Call call = new Call();
+							message.getHeader().setCall(call);
+						}
+						message.getHeader().getCall().setReplyTo(replyEpr);
+						//Send the message and wait for the reply.
+						if (courier.deliver(message)) {
+							replyMessage = jmsQueueReplyListener.listen(replyEpr, _replyListenerTimeout);
+							break;
+						} else {
+							_logger.warn("Could not deliver the message, maybe there is another JMS-EPR we can use.");
+						}
 					} else {
-						_logger.warn("Could not deliver the message, maybe there is another JMS-EPR we can use.");
+						_logger.warn("Found a non-JMS based EPR, but since we want a reply we need a JMS-EPR");
 					}
-				} else {
-					_logger.warn("Found a non-JMS EPR, but since we want a reply we need a JMS-EPR");
+				} catch (URISyntaxException e) {
+					_logger.error(e.getLocalizedMessage(), e);
+					//Let's be optimistic and try to see if there is another epr.
+					continue;
 				}
 			}
     	} catch (CourierException ce) {
@@ -152,12 +157,29 @@
     	}
         return replyMessage;
     }
-    
-    public Message deliverToDestinations(Message message) {
-    	
-    	return null;
+    /**
+     * Do *not* route, just deliver. Pulls the destinationServices from the message and deliver the message to the destination(s).
+     * @param message 
+     * @return
+     */
+    @SuppressWarnings("unchecked")
+	public Message deliver(Message message) 
+    {
+    	Collection<String> destinationServices = (Collection) message.getProperties().getProperty(MessageRouter.ROUTING_DESTINATION_SERVICE_LIST);
+    	if (destinationServices!=null) {
+	    	//TODO Remove the property from the message?
+	    	MessageRouter.deliverMessages(destinationServices, message);
+    	} else {
+    		_logger.error("No destination services were found, the message was not delivered");
+    	}
+    	return message;
     }
-    
+    /**
+     * Route and Deliver to the destination Services.
+     * 
+     * @param message - to be routed and delivered
+     * @return message - same message is returned
+     */
     public Message routeAndDeliver(Message message) 
     {
     	try {

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java	2006-11-07 23:06:26 UTC (rev 7455)
@@ -37,4 +37,6 @@
     public static final String FILE_POST_SFX_TAG			= "postSuffix";
     public static final String FILE_POST_DEL_TAG			= "postDelete";
     
+    /** Reply Listener */
+    public static final String REPLY_LISTENER_TIMEOUT_TAG   = "replyListenerTimeout";
 }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java	2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java	2006-11-07 23:06:26 UTC (rev 7455)
@@ -57,8 +57,8 @@
 	public static final String CORRELATION_ID_TAG = "jmsCorrelationId";
 	
 	private static Logger _logger = Logger.getLogger(JmsQueueReplyListener.class);
-	private static long SLEEP_FOR_RETRY = 500;
-	private static long LISTEN_TIME_OUT = 50000;
+	/** maybe we have network issues, sleep 5 seconds before trying again. */
+	private static long SLEEP_FOR_RETRY = 5000;
 	private QueueConnection _Qconn = null;
 	private QueueSession 	_Qsess = null;
 	private Queue			_Queue = null;
@@ -109,7 +109,7 @@
         return null;
     }
 
-    public Message listen(JMSEpr epr)
+    public Message listen(JMSEpr epr, long replyListenerTimout)
     {
     	MessageConsumer consumer = null;
     	javax.jms.Message jmsMessage = null;
@@ -118,7 +118,7 @@
         		try 
         		{ 
         			consumer = getConsumer(epr); 
-                	jmsMessage = consumer.receive(LISTEN_TIME_OUT);
+                	jmsMessage = consumer.receive(replyListenerTimout);
                 	break;
         		} 
         		catch (Exception e)

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java	2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java	2006-11-07 23:06:26 UTC (rev 7455)
@@ -4,9 +4,7 @@
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -17,20 +15,13 @@
 import org.drools.WorkingMemory;
 import org.drools.compiler.PackageBuilder;
 import org.drools.rule.Package;
-import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.registry.RegistryFactory;
-import org.jboss.soa.esb.services.registry.Registry;
 import org.jboss.soa.esb.services.routing.MessageRouter;
 import org.jboss.soa.esb.services.routing.MessageRouterException;
 import org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter;
 
 
-public class JBossRulesRouter implements ContentBasedRouter 
+public class JBossRulesRouter extends ContentBasedRouter 
 {
 	private static Map<String,WorkingMemory> workingMemories=new HashMap<String,WorkingMemory>();
 	private static Logger logger = Logger.getLogger(JBossRulesRouter.class);
@@ -103,48 +94,4 @@
 		ruleBase.addPackage(pkg);
 		return ruleBase;
 	}
-	/**
-	 * Sends the message on to the service with the name(s) we just obtained from the routing.
-	 * 
-	 * @param destinationServices - Collection with the name of the destination services.
-	 * @param message             - the message that needs routing and delivery
-	 */
-	private void deliverMessages(Collection<String> destinationServices, Message message) 
-	{
-		for (Iterator<String> i=destinationServices.iterator();i.hasNext();) {
-			String destinationService = i.next();
-			String[] strArray = destinationService.split(":");
-			String category = strArray[0];
-			String serviceName = strArray[1];
-			boolean isSent=false;
-			try {
-				Registry registry = RegistryFactory.getRegistry();
-				logger.log(Priority.INFO, "Looking for EPRs for category=" + category +
-						" and serviceName=" + serviceName);
-				Collection<EPR> eprs = registry.findEPRs(category, serviceName);
-				for (Iterator<EPR> eprIter=eprs.iterator();eprIter.hasNext();) {
-					EPR epr = eprs.iterator().next();
-					logger.log(Priority.INFO, "Message=" + message + " -> Destination=" + destinationService);
-					try {
-						//Give the message to the courier
-						Courier courier = CourierFactory.getCourier(epr);
-						courier.deliver(message);
-						isSent=true;
-						break;
-					} catch (CourierException ce) {
-						logger.log(Priority.ERROR, "Could not send using epr:" + epr);
-						//if there are more eprs in the collection is will try the next one.
-					}
-				}
-				if (isSent==false) {
-					logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.");
-					//Route to /dev/null?
-				}
-			} catch (RegistryException re) {
-				logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. " + re.getLocalizedMessage(), re);
-				//Route to /dev/null?
-			}
-		}
-	}
-
 }

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2006-11-07 23:06:26 UTC (rev 7455)
@@ -21,24 +21,80 @@
  */
 package org.jboss.soa.esb.services.routing;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
 /**
  * Generic Message Router Interface.
  * 
  * @author kurt.stam at redhat.com
  *
  */
-public interface MessageRouter 
+public abstract class MessageRouter 
 {
-	String ROUTING_DESTINATION_SERVICE_LIST = "routing.destinationServiceList";
-	String DELIVER_MESSAGES                 = "routing.deliverMessages";
+	private static Logger logger = Logger.getLogger(MessageRouter.class);
+	public static String ROUTING_DESTINATION_SERVICE_LIST = "routing.destinationServiceList";
+	public static String DELIVER_MESSAGES                 = "routing.deliverMessages";
 	/**
 	 * Routes the message to the next destination. 
 	 * 
 	 * @param message
 	 * @return List of Strings containing the services to which the message was routed.
 	 */
-	public List<String> route(Message message);
+	public abstract List<String> route(Message message);
+	/**
+	 * Sends the message on to the service with the name(s) we just obtained from the routing.
+	 * 
+	 * @param destinationServices - Collection with the name of the destination services.
+	 * @param message             - the message that needs routing and delivery
+	 */
+	public synchronized static void deliverMessages(Collection<String> destinationServices, Message message) 
+	{
+		for (Iterator<String> i=destinationServices.iterator();i.hasNext();) {
+			String destinationService = i.next();
+			String[] strArray = destinationService.split(":");
+			String category = strArray[0];
+			String serviceName = strArray[1];
+			boolean isSent=false;
+			try {
+				Registry registry = RegistryFactory.getRegistry();
+				logger.log(Priority.INFO, "Looking for EPRs for category=" + category +
+						" and serviceName=" + serviceName);
+				Collection<EPR> eprs = registry.findEPRs(category, serviceName);
+				for (Iterator<EPR> eprIter=eprs.iterator();eprIter.hasNext();) {
+					EPR epr = eprs.iterator().next();
+					logger.log(Priority.INFO, "Message=" + message + " -> Destination=" + destinationService);
+					try {
+						//Give the message to the courier
+						Courier courier = CourierFactory.getCourier(epr);
+						courier.deliver(message);
+						isSent=true;
+						break;
+					} catch (CourierException ce) {
+						logger.log(Priority.ERROR, "Could not send using epr:" + epr);
+						//if there are more eprs in the collection is will try the next one.
+					}
+				}
+				if (isSent==false) {
+					logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.");
+					//Route to /dev/null?
+				}
+			} catch (RegistryException re) {
+				logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. " + re.getLocalizedMessage(), re);
+				//Route to /dev/null?
+			}
+		}
+	}
+
 }

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java	2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java	2006-11-07 23:06:26 UTC (rev 7455)
@@ -32,7 +32,7 @@
  * @author kurt.stam at redhat.com
  *
  */
-public interface ContentBasedRouter extends MessageRouter {
+public abstract class ContentBasedRouter extends MessageRouter {
 	/**
 	 * Route a message using a certain ruleSet.
 	 * 
@@ -40,5 +40,5 @@
 	 * @param message - Message that needs routing.
 	 * @return
 	 */
-	public List<String> route(String ruleSet, Message message);
+	public abstract List<String> route(String ruleSet, Message message);
 }




More information about the jboss-svn-commits mailing list