[jboss-dev-forums] [Design of JBoss ESB] - Re: Design of a Content Based Routing Service

ddunkin do-not-reply at jboss.com
Thu Nov 9 16:17:54 EST 2006


I have a few concerns with the router. Hopefully these aren't issues and it's just me being nieve.

Is the method of caching WorkingMemories thread safe? I assume (maybe falsely?) that the ESB is multi-threaded. That is, it can process multiple messages at once by using separate threads. I see two problems here: if two messages come in for routing at once, both with the same ruleset, the two threads could walk all over each other. If both threads are sharing the same WorkingMemory object, how can you be sure which message or destinationServices list the rules are operating on? Below is a patch that addresses the issue by using synchronization. One alternative would be to pool WorkingMemory objects.

I also fixed what I think is a memory leak with the Message being retained in the WorkingMemory forever.


  | Index: F:/dev/jbossesb-workspace/JBossESB/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
  | ===================================================================
  | --- F:/dev/jbossesb-workspace/JBossESB/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java	(revision 7517)
  | +++ F:/dev/jbossesb-workspace/JBossESB/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java	(working copy)
  | @@ -25,12 +25,13 @@
  |  import java.io.InputStreamReader;
  |  import java.io.Reader;
  |  import java.util.ArrayList;
  | -import java.util.HashMap;
  |  import java.util.List;
  |  import java.util.Map;
  | +import java.util.concurrent.ConcurrentHashMap;
  |  
  |  import org.apache.log4j.Logger;
  |  import org.apache.log4j.Priority;
  | +import org.drools.FactHandle;
  |  import org.drools.RuleBase;
  |  import org.drools.RuleBaseFactory;
  |  import org.drools.WorkingMemory;
  | @@ -49,7 +50,7 @@
  |   */
  |  public class JBossRulesRouter extends ContentBasedRouter 
  |  {
  | -	private static Map<String,WorkingMemory> workingMemories=new HashMap<String,WorkingMemory>();
  | +	private static Map<String,WorkingMemory> workingMemories=new ConcurrentHashMap<String,WorkingMemory>();
  |  	private static Logger logger = Logger.getLogger(JBossRulesRouter.class);
  |  	/**
  |  	 * Route the message, where the routing rules are supplied as part of
  | @@ -87,20 +88,22 @@
  |  	{
  |  		List<String> destinationServices = new ArrayList<String>();
  |  		try {
  | -			if (!workingMemories.containsKey(ruleSet)) {
  | +			WorkingMemory workingMemory = workingMemories.get(ruleSet);
  | +			if (workingMemory == null) {
  |  			    logger.log(Priority.INFO, "Reading ruleSet from file=" + ruleSet);
  |  				RuleBase ruleBase = readRuleBase(ruleSet, ruleLanguage);
  | -				WorkingMemory workingMemory = ruleBase.newWorkingMemory();
  | +				workingMemory = ruleBase.newWorkingMemory();
  |  				workingMemories.put(ruleSet, workingMemory);
  |  			}
  |  			logger.log(Priority.DEBUG, "Obtained message=" + message + " with ruleSet=" + ruleSet);
  | -			WorkingMemory workingMemory = workingMemories.get(ruleSet);
  | -			workingMemory.setGlobal("destinationServices", destinationServices);
  | -			workingMemory.assertObject(message);
  | -			logger.log(Priority.INFO, "Fire the rules engine");
  | -			workingMemory.fireAllRules();
  | +			synchronized (workingMemory) {
  | +				workingMemory.setGlobal("destinationServices", destinationServices);
  | +				FactHandle factHandle = workingMemory.assertObject(message);
  | +				logger.log(Priority.INFO, "Fire the rules engine");
  | +				workingMemory.fireAllRules();
  | +				workingMemory.retractObject(factHandle);
  | +			}
  |  			//Now route there, later we will implement an option to place a callback.
  | -			destinationServices = (List) workingMemory.getGlobal("destinationServices");
  |  			logger.log(Priority.DEBUG, "Destination Services List: " + destinationServices);
  |  			Boolean deliverMessages = (Boolean) message.getProperties().getProperty(MessageRouter.DELIVER_MESSAGES);
  |  			//Only actuall deliver the message if this is set in the message
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3984660#3984660

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3984660



More information about the jboss-dev-forums mailing list