I have a few concerns with the router. Hopefully these aren't issues and it's just
me being nieve.
Is the method of caching WorkingMemories thread safe? I assume (maybe falsely?) that the
ESB is multi-threaded. That is, it can process multiple messages at once by using separate
threads. I see two problems here: if two messages come in for routing at once, both with
the same ruleset, the two threads could walk all over each other. If both threads are
sharing the same WorkingMemory object, how can you be sure which message or
destinationServices list the rules are operating on? Below is a patch that addresses the
issue by using synchronization. One alternative would be to pool WorkingMemory objects.
I also fixed what I think is a memory leak with the Message being retained in the
WorkingMemory forever.
| Index:
F:/dev/jbossesb-workspace/JBossESB/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
| ===================================================================
| ---
F:/dev/jbossesb-workspace/JBossESB/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java (revision
7517)
| +++
F:/dev/jbossesb-workspace/JBossESB/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java (working
copy)
| @@ -25,12 +25,13 @@
| import java.io.InputStreamReader;
| import java.io.Reader;
| import java.util.ArrayList;
| -import java.util.HashMap;
| import java.util.List;
| import java.util.Map;
| +import java.util.concurrent.ConcurrentHashMap;
|
| import org.apache.log4j.Logger;
| import org.apache.log4j.Priority;
| +import org.drools.FactHandle;
| import org.drools.RuleBase;
| import org.drools.RuleBaseFactory;
| import org.drools.WorkingMemory;
| @@ -49,7 +50,7 @@
| */
| public class JBossRulesRouter extends ContentBasedRouter
| {
| - private static Map<String,WorkingMemory> workingMemories=new
HashMap<String,WorkingMemory>();
| + private static Map<String,WorkingMemory> workingMemories=new
ConcurrentHashMap<String,WorkingMemory>();
| private static Logger logger = Logger.getLogger(JBossRulesRouter.class);
| /**
| * Route the message, where the routing rules are supplied as part of
| @@ -87,20 +88,22 @@
| {
| List<String> destinationServices = new ArrayList<String>();
| try {
| - if (!workingMemories.containsKey(ruleSet)) {
| + WorkingMemory workingMemory = workingMemories.get(ruleSet);
| + if (workingMemory == null) {
| logger.log(Priority.INFO, "Reading ruleSet from file=" + ruleSet);
| RuleBase ruleBase = readRuleBase(ruleSet, ruleLanguage);
| - WorkingMemory workingMemory = ruleBase.newWorkingMemory();
| + workingMemory = ruleBase.newWorkingMemory();
| workingMemories.put(ruleSet, workingMemory);
| }
| logger.log(Priority.DEBUG, "Obtained message=" + message + " with
ruleSet=" + ruleSet);
| - WorkingMemory workingMemory = workingMemories.get(ruleSet);
| - workingMemory.setGlobal("destinationServices", destinationServices);
| - workingMemory.assertObject(message);
| - logger.log(Priority.INFO, "Fire the rules engine");
| - workingMemory.fireAllRules();
| + synchronized (workingMemory) {
| + workingMemory.setGlobal("destinationServices", destinationServices);
| + FactHandle factHandle = workingMemory.assertObject(message);
| + logger.log(Priority.INFO, "Fire the rules engine");
| + workingMemory.fireAllRules();
| + workingMemory.retractObject(factHandle);
| + }
| //Now route there, later we will implement an option to place a callback.
| - destinationServices = (List)
workingMemory.getGlobal("destinationServices");
| logger.log(Priority.DEBUG, "Destination Services List: " +
destinationServices);
| Boolean deliverMessages = (Boolean)
message.getProperties().getProperty(MessageRouter.DELIVER_MESSAGES);
| //Only actuall deliver the message if this is set in the message
|
View the original post :
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3984660#...
Reply to the post :
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&a...