[jboss-svn-commits] JBL Code SVN: r7455 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb/actions listeners/src/org/jboss/soa/esb/listeners listeners/src/org/jboss/soa/esb/listeners/message services/src/org/jboss/internal/soa/esb/services/routing/cbr services/src/org/jboss/soa/esb/services/routing services/src/org/jboss/soa/esb/services/routing/cbr
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Nov 7 18:06:30 EST 2006
Author: kurt.stam at jboss.com
Date: 2006-11-07 18:06:26 -0500 (Tue, 07 Nov 2006)
New Revision: 7455
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java
Log:
Cleaning up CbrProxyAction code.
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java 2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/CbrProxyAction.java 2006-11-07 23:06:26 UTC (rev 7455)
@@ -59,6 +59,10 @@
protected Collection<EPR> _eprs;
/** Configuration Tree */
protected ConfigTree _config;
+ /** Default 1 minute Listener Timeout in millis */
+ final private static long DEFAULT_REPLY_LISTENER_TIMEOUT = 60000;
+ /** replyListenerTimout */
+ private long _replyListenerTimeout = DEFAULT_REPLY_LISTENER_TIMEOUT;
/**
* Constructor of the Content Based Router Proxy. This constructor obtains
@@ -69,10 +73,18 @@
*/
public CbrProxyAction(ConfigTree config) throws RegistryException
{
- Registry registry = RegistryFactory.getRegistry();
+ Registry registry = RegistryFactory.getRegistry();
String serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
String serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _eprs = registry.findEPRs(serviceCategoryName, serviceName);
+ _eprs = registry.findEPRs(serviceCategoryName, serviceName);
+ _config = config;
+ if (config.getAttribute(ListenerTagNames.REPLY_LISTENER_TIMEOUT_TAG)!=null) {
+ try {
+ _replyListenerTimeout = Long.parseLong(config.getAttribute(ListenerTagNames.REPLY_LISTENER_TIMEOUT_TAG));
+ } catch (NumberFormatException nfe) {
+ _logger.error("Could not parse " + config.getAttribute(ListenerTagNames.REPLY_LISTENER_TIMEOUT_TAG) + " to long.", nfe);
+ }
+ }
}
/**
* No-Op method. This is here for notification purposes.
@@ -97,7 +109,7 @@
* Just route. The routing returns the message with a Collection of destinationService names.
*
* @param message
- * @return
+ * @return message, a collection of service destinations is added to the message.
*/
public Message route(Message message)
{
@@ -107,44 +119,37 @@
try {
for (Iterator<EPR> eprIterator=_eprs.iterator();eprIterator.hasNext();){
- //Just use the first EPR in the list.
- EPR epr = eprIterator.next();
- Courier courier = CourierFactory.getCourier(epr);
- //If not successful try the next EPR
- if (epr instanceof JMSEpr) {
- //Setting the replyTo to the queue we specific in the epr we are going to call
- //The selectors will do the job.
- if (message.getHeader().getCall()==null) {
- Call call = new Call();
- message.getHeader().setCall(call);
- }
-
- JMSEpr jpr = (JMSEpr)epr;
- JMSEpr replyEpr = null;
- try
- { replyEpr =
- new JMSEpr(jpr.getDestinationType()
- ,jpr.getDestinationName()
- ,jpr.getConnectionFactory()
- ,jpr.getJndiType()
- ,jpr.getJndiURL()
- ,jmsQueueReplyListener.getReplySelector()
- );
- }
- catch (URISyntaxException e)
- {
- continue;
- }
- message.getHeader().getCall().setReplyTo(replyEpr);
-
- if (courier.deliver(message)) {
- replyMessage = jmsQueueReplyListener.listen(replyEpr);
- break;
+ try {
+ //Just use the first EPR in the list.
+ EPR epr = eprIterator.next();
+ Courier courier = CourierFactory.getCourier(epr);
+ //If not successful try the next EPR
+ if (epr instanceof JMSEpr) {
+ //Setting the replyTo to the queue we specific in the epr we are going to call
+ //The selectors will do the job.
+ JMSEpr jpr = (JMSEpr)epr;
+ JMSEpr replyEpr = new JMSEpr(jpr.getDestinationType() ,jpr.getDestinationName()
+ ,jpr.getConnectionFactory(),jpr.getJndiType(),jpr.getJndiURL()
+ ,jmsQueueReplyListener.getReplySelector());
+ if (message.getHeader().getCall()==null) {
+ Call call = new Call();
+ message.getHeader().setCall(call);
+ }
+ message.getHeader().getCall().setReplyTo(replyEpr);
+ //Send the message and wait for the reply.
+ if (courier.deliver(message)) {
+ replyMessage = jmsQueueReplyListener.listen(replyEpr, _replyListenerTimeout);
+ break;
+ } else {
+ _logger.warn("Could not deliver the message, maybe there is another JMS-EPR we can use.");
+ }
} else {
- _logger.warn("Could not deliver the message, maybe there is another JMS-EPR we can use.");
+ _logger.warn("Found a non-JMS based EPR, but since we want a reply we need a JMS-EPR");
}
- } else {
- _logger.warn("Found a non-JMS EPR, but since we want a reply we need a JMS-EPR");
+ } catch (URISyntaxException e) {
+ _logger.error(e.getLocalizedMessage(), e);
+ //Let's be optimistic and try to see if there is another epr.
+ continue;
}
}
} catch (CourierException ce) {
@@ -152,12 +157,29 @@
}
return replyMessage;
}
-
- public Message deliverToDestinations(Message message) {
-
- return null;
+ /**
+ * Do *not* route, just deliver. Pulls the destinationServices from the message and deliver the message to the destination(s).
+ * @param message
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public Message deliver(Message message)
+ {
+ Collection<String> destinationServices = (Collection) message.getProperties().getProperty(MessageRouter.ROUTING_DESTINATION_SERVICE_LIST);
+ if (destinationServices!=null) {
+ //TODO Remove the property from the message?
+ MessageRouter.deliverMessages(destinationServices, message);
+ } else {
+ _logger.error("No destination services were found, the message was not delivered");
+ }
+ return message;
}
-
+ /**
+ * Route and Deliver to the destination Services.
+ *
+ * @param message - to be routed and delivered
+ * @return message - same message is returned
+ */
public Message routeAndDeliver(Message message)
{
try {
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2006-11-07 23:06:26 UTC (rev 7455)
@@ -37,4 +37,6 @@
public static final String FILE_POST_SFX_TAG = "postSuffix";
public static final String FILE_POST_DEL_TAG = "postDelete";
+ /** Reply Listener */
+ public static final String REPLY_LISTENER_TIMEOUT_TAG = "replyListenerTimeout";
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java 2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueReplyListener.java 2006-11-07 23:06:26 UTC (rev 7455)
@@ -57,8 +57,8 @@
public static final String CORRELATION_ID_TAG = "jmsCorrelationId";
private static Logger _logger = Logger.getLogger(JmsQueueReplyListener.class);
- private static long SLEEP_FOR_RETRY = 500;
- private static long LISTEN_TIME_OUT = 50000;
+ /** maybe we have network issues, sleep 5 seconds before trying again. */
+ private static long SLEEP_FOR_RETRY = 5000;
private QueueConnection _Qconn = null;
private QueueSession _Qsess = null;
private Queue _Queue = null;
@@ -109,7 +109,7 @@
return null;
}
- public Message listen(JMSEpr epr)
+ public Message listen(JMSEpr epr, long replyListenerTimout)
{
MessageConsumer consumer = null;
javax.jms.Message jmsMessage = null;
@@ -118,7 +118,7 @@
try
{
consumer = getConsumer(epr);
- jmsMessage = consumer.receive(LISTEN_TIME_OUT);
+ jmsMessage = consumer.receive(replyListenerTimout);
break;
}
catch (Exception e)
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java 2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java 2006-11-07 23:06:26 UTC (rev 7455)
@@ -4,9 +4,7 @@
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -17,20 +15,13 @@
import org.drools.WorkingMemory;
import org.drools.compiler.PackageBuilder;
import org.drools.rule.Package;
-import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.registry.RegistryFactory;
-import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.routing.MessageRouter;
import org.jboss.soa.esb.services.routing.MessageRouterException;
import org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter;
-public class JBossRulesRouter implements ContentBasedRouter
+public class JBossRulesRouter extends ContentBasedRouter
{
private static Map<String,WorkingMemory> workingMemories=new HashMap<String,WorkingMemory>();
private static Logger logger = Logger.getLogger(JBossRulesRouter.class);
@@ -103,48 +94,4 @@
ruleBase.addPackage(pkg);
return ruleBase;
}
- /**
- * Sends the message on to the service with the name(s) we just obtained from the routing.
- *
- * @param destinationServices - Collection with the name of the destination services.
- * @param message - the message that needs routing and delivery
- */
- private void deliverMessages(Collection<String> destinationServices, Message message)
- {
- for (Iterator<String> i=destinationServices.iterator();i.hasNext();) {
- String destinationService = i.next();
- String[] strArray = destinationService.split(":");
- String category = strArray[0];
- String serviceName = strArray[1];
- boolean isSent=false;
- try {
- Registry registry = RegistryFactory.getRegistry();
- logger.log(Priority.INFO, "Looking for EPRs for category=" + category +
- " and serviceName=" + serviceName);
- Collection<EPR> eprs = registry.findEPRs(category, serviceName);
- for (Iterator<EPR> eprIter=eprs.iterator();eprIter.hasNext();) {
- EPR epr = eprs.iterator().next();
- logger.log(Priority.INFO, "Message=" + message + " -> Destination=" + destinationService);
- try {
- //Give the message to the courier
- Courier courier = CourierFactory.getCourier(epr);
- courier.deliver(message);
- isSent=true;
- break;
- } catch (CourierException ce) {
- logger.log(Priority.ERROR, "Could not send using epr:" + epr);
- //if there are more eprs in the collection is will try the next one.
- }
- }
- if (isSent==false) {
- logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.");
- //Route to /dev/null?
- }
- } catch (RegistryException re) {
- logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. " + re.getLocalizedMessage(), re);
- //Route to /dev/null?
- }
- }
- }
-
}
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2006-11-07 23:06:26 UTC (rev 7455)
@@ -21,24 +21,80 @@
*/
package org.jboss.soa.esb.services.routing;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
/**
* Generic Message Router Interface.
*
* @author kurt.stam at redhat.com
*
*/
-public interface MessageRouter
+public abstract class MessageRouter
{
- String ROUTING_DESTINATION_SERVICE_LIST = "routing.destinationServiceList";
- String DELIVER_MESSAGES = "routing.deliverMessages";
+ private static Logger logger = Logger.getLogger(MessageRouter.class);
+ public static String ROUTING_DESTINATION_SERVICE_LIST = "routing.destinationServiceList";
+ public static String DELIVER_MESSAGES = "routing.deliverMessages";
/**
* Routes the message to the next destination.
*
* @param message
* @return List of Strings containing the services to which the message was routed.
*/
- public List<String> route(Message message);
+ public abstract List<String> route(Message message);
+ /**
+ * Sends the message on to the service with the name(s) we just obtained from the routing.
+ *
+ * @param destinationServices - Collection with the name of the destination services.
+ * @param message - the message that needs routing and delivery
+ */
+ public synchronized static void deliverMessages(Collection<String> destinationServices, Message message)
+ {
+ for (Iterator<String> i=destinationServices.iterator();i.hasNext();) {
+ String destinationService = i.next();
+ String[] strArray = destinationService.split(":");
+ String category = strArray[0];
+ String serviceName = strArray[1];
+ boolean isSent=false;
+ try {
+ Registry registry = RegistryFactory.getRegistry();
+ logger.log(Priority.INFO, "Looking for EPRs for category=" + category +
+ " and serviceName=" + serviceName);
+ Collection<EPR> eprs = registry.findEPRs(category, serviceName);
+ for (Iterator<EPR> eprIter=eprs.iterator();eprIter.hasNext();) {
+ EPR epr = eprs.iterator().next();
+ logger.log(Priority.INFO, "Message=" + message + " -> Destination=" + destinationService);
+ try {
+ //Give the message to the courier
+ Courier courier = CourierFactory.getCourier(epr);
+ courier.deliver(message);
+ isSent=true;
+ break;
+ } catch (CourierException ce) {
+ logger.log(Priority.ERROR, "Could not send using epr:" + epr);
+ //if there are more eprs in the collection is will try the next one.
+ }
+ }
+ if (isSent==false) {
+ logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.");
+ //Route to /dev/null?
+ }
+ } catch (RegistryException re) {
+ logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. " + re.getLocalizedMessage(), re);
+ //Route to /dev/null?
+ }
+ }
+ }
+
}
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java 2006-11-07 22:19:00 UTC (rev 7454)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/cbr/ContentBasedRouter.java 2006-11-07 23:06:26 UTC (rev 7455)
@@ -32,7 +32,7 @@
* @author kurt.stam at redhat.com
*
*/
-public interface ContentBasedRouter extends MessageRouter {
+public abstract class ContentBasedRouter extends MessageRouter {
/**
* Route a message using a certain ruleSet.
*
@@ -40,5 +40,5 @@
* @param message - Message that needs routing.
* @return
*/
- public List<String> route(String ruleSet, Message message);
+ public abstract List<String> route(String ruleSet, Message message);
}
More information about the jboss-svn-commits
mailing list