[jboss-svn-commits] JBL Code SVN: r34268 - in labs/jbossesb/trunk/product: services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Jul 28 16:34:35 EDT 2010
Author: dward
Date: 2010-07-28 16:34:34 -0400 (Wed, 28 Jul 2010)
New Revision: 34268
Modified:
labs/jbossesb/trunk/product/.classpath
labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java
labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java
labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java
Log:
Fix for JBESB-3397 ( https://jira.jboss.org/browse/JBESB-3397 ).
Modified: labs/jbossesb/trunk/product/.classpath
===================================================================
--- labs/jbossesb/trunk/product/.classpath 2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/.classpath 2010-07-28 20:34:34 UTC (rev 34268)
@@ -70,7 +70,7 @@
<classpathentry kind="lib" path="build/lib/jboss-deployers-core-spi-2.0.7.GA.jar"/>
<classpathentry kind="lib" path="build/lib/jboss-deployers-impl-2.0.7.GA.jar"/>
<classpathentry kind="lib" path="build/lib/jboss-deployers-spi-2.0.7.GA.jar"/>
- <classpathentry kind="lib" path="build/lib/jboss-deployers-structure-spi-2.0.7.GA.jar" sourcepath="/jboss-deployers-structure-spi"/>
+ <classpathentry kind="lib" path="build/lib/jboss-deployers-structure-spi-2.0.7.GA.jar"/>
<classpathentry kind="lib" path="build/lib/jboss-deployers-vfs-2.0.7.GA.jar"/>
<classpathentry kind="lib" path="build/lib/jboss-deployers-vfs-spi-2.0.7.GA.jar"/>
<classpathentry kind="lib" path="build/lib/jboss-kernel-2.0.6.GA.jar"/>
Modified: labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java 2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java 2010-07-28 20:34:34 UTC (rev 34268)
@@ -322,7 +322,7 @@
count = actionCounterHash.get(messageProfile + " " + MESSAGE_COUNTER);
count = count.intValue() + 1;
} else {
- count = new Integer(1);
+ count = Integer.valueOf(1);
}
actionCounterHash.put(messageProfile + " " + MESSAGE_COUNTER, count);
@@ -339,7 +339,7 @@
count = actionFailedCounterHash.get(messageProfile + " " + FAILED_MESSAGE_COUNTER);
count = count.intValue() + 1;
} else {
- count = new Integer(1);
+ count = Integer.valueOf(1);
}
actionFailedCounterHash.put(messageProfile + " " + FAILED_MESSAGE_COUNTER, count);
}
Modified: labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java 2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java 2010-07-28 20:34:34 UTC (rev 34268)
@@ -97,6 +97,10 @@
*/
private KnowledgeRuntimeLogger statefulRuntimeLogger;
/**
+ * The stateful fireUntilHalt thread.
+ */
+ private Thread statefulFireUntilHaltThread;
+ /**
* The stateful session lock.
*/
private final Lock statefulSessionLock = new ReentrantLock();
@@ -152,7 +156,7 @@
if (head != null)
{
statelessSession = head;
- sid = generateId(statelessSession);
+ sid = getId(statelessSession);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("reusing old stateles session [" + sid + "]");
@@ -161,7 +165,7 @@
else
{
statelessSession = ruleBase.newStatelessKnowledgeSession();
- sid = generateId(statelessSession);
+ sid = getId(statelessSession);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("created new stateless session [" + sid + "]");
@@ -187,7 +191,7 @@
KnowledgeRuntimeLogger statelessRuntimeLogger = getRuntimeLogger(ruleInfo, statelessSession);
if (statelessRuntimeLogger != null && LOGGER.isDebugEnabled())
{
- LOGGER.debug("created new runtime logger [" + generateId(statelessRuntimeLogger) + "]");
+ LOGGER.debug("created new runtime logger [" + getId(statelessRuntimeLogger) + "]");
}
if (LOGGER.isTraceEnabled())
{
@@ -209,7 +213,7 @@
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("calling close() on runtime logger [" + generateId(statelessRuntimeLogger) + "]");
+ LOGGER.debug("calling close() on runtime logger [" + getId(statelessRuntimeLogger) + "]");
}
statelessRuntimeLogger.close();
}
@@ -251,29 +255,46 @@
{
if (statefulSession != null && !continueState)
{
- final String ssid = generateId(statefulSession, statefulSessionCount);
+ final String ssid = getId(statefulSession, statefulSessionCount);
final StatefulKnowledgeSession disposedStatefulSession = statefulSession;
+ final Thread haltedStatefulFireUntilHaltThread = statefulFireUntilHaltThread;
final KnowledgeRuntimeLogger closedStatefulRuntimeLogger = statefulRuntimeLogger;
statefulSession = null;
+ statefulFireUntilHaltThread = null;
statefulRuntimeLogger = null;
- if (fireUntilHalt)
+ // Maybe halt the session
+ if (haltedStatefulFireUntilHaltThread != null)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("calling halt on stateful session [" + ssid + "] - no continue set");
}
disposedStatefulSession.halt();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("joining thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]");
+ }
+ try
+ {
+ haltedStatefulFireUntilHaltThread.join();
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.error("interrupted thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]", ie);
+ }
}
+ // Always dispose the session
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("calling dispose() on stateful session [" + ssid + "] - no continue set");
}
disposedStatefulSession.dispose();
+ // Maybe close the logger
if (closedStatefulRuntimeLogger != null)
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("calling close() on runtime logger [" + generateId(closedStatefulRuntimeLogger, statefulSessionCount) + "] - no continue set");
+ LOGGER.debug("calling close() on runtime logger [" + getId(closedStatefulRuntimeLogger, statefulSessionCount) + "] - no continue set");
}
closedStatefulRuntimeLogger.close();
}
@@ -287,7 +308,7 @@
KnowledgeSessionConfiguration statefulSessionConfiguration = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
setClockType(ruleInfo, statefulSessionConfiguration);
statefulSession = ruleBase.newStatefulKnowledgeSession(statefulSessionConfiguration, EnvironmentFactory.newEnvironment());
- ssid = generateId(statefulSession, statefulSessionCount);
+ ssid = getId(statefulSession, statefulSessionCount);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("created new stateful session [" + ssid + "]");
@@ -296,13 +317,13 @@
statefulRuntimeLogger = getRuntimeLogger(ruleInfo, statefulSession);
if (statefulRuntimeLogger != null && LOGGER.isDebugEnabled())
{
- LOGGER.debug("created new runtime logger [" + generateId(statefulRuntimeLogger, statefulSessionCount) + "]");
+ LOGGER.debug("created new runtime logger [" + getId(statefulRuntimeLogger, statefulSessionCount) + "]");
}
}
else
{
isStatefulSessionNew = false;
- ssid = generateId(statefulSession, statefulSessionCount);
+ ssid = getId(statefulSession, statefulSessionCount);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("reusing old stateful session [" + ssid + "]");
@@ -337,7 +358,7 @@
LOGGER.debug("event [" + ef.getObject().getClass().getName() + "], startTimeStamp [" + ef.getStartTimestamp() + "]");
}
- // Always insert the Default facts
+ // Always insert the default facts (into the main WorkingMemory; no entry-point specified)
final List<Object> defaultFacts = ruleInfo.getDefaultFacts();
if (defaultFacts != null)
{
@@ -351,14 +372,15 @@
}
}
- // Insert any object that did not specify an entry-point into the main working memory.
- if (ruleInfo.getFacts() != null)
+ // Maybe insert entry point facts (into a named WorkingMemoryEntryPoint)
+ final Map<String,List<Object>> facts = ruleInfo.getFacts();
+ if (facts != null)
{
if (LOGGER.isTraceEnabled())
{
- LOGGER.trace("calling insert(Object) on stateful session [" + ssid + "] for each fact");
+ LOGGER.trace("calling insert(Object) on stateful session [" + ssid + "] for each entry point fact");
}
- for(Entry<String, List<Object>> entry : ruleInfo.getFacts().entrySet())
+ for(Entry<String, List<Object>> entry : facts.entrySet())
{
String entryPointName = entry.getKey();
// Insert objects that have explicitly specified an entry-point.
@@ -367,9 +389,9 @@
{
throw new RuleServiceException("The entry-point '" + entryPointName + "' was not found in the current stateful session. Please double check your rules source");
}
- for(Object object : entry.getValue())
+ for(Object fact : entry.getValue())
{
- wmep.insert(object);
+ wmep.insert(fact);
}
}
}
@@ -387,31 +409,36 @@
{
final String threadName = new StringBuilder()
.append(getClass().getSimpleName())
- .append(":fireUntilHalt[")
- .append(ssid).append("]")
+ .append(":fireUntilHalt(")
+ .append(ssid)
+ .append(")")
.toString();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("calling fireUntilHalt() on stateful session [" + ssid + "] in new thread [" + threadName + "]");
+ LOGGER.debug("spawning fireUntilHalt() on stateful session [" + ssid + "] in thread [" + threadName + "]");
}
final ClassLoader goodClassLoader = Thread.currentThread().getContextClassLoader();
- final Thread thread = new Thread(new Runnable() {
+ statefulFireUntilHaltThread = new Thread(new Runnable() {
public void run() {
Thread thread = Thread.currentThread();
ClassLoader origClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(goodClassLoader);
try {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("calling fireUntilHalt() on stateful session [" + ssid + "] in thread [" + threadName + "]");
+ }
statefulSession.fireUntilHalt();
} catch (NullPointerException npe) {
- LOGGER.warn("fireUntilHalt() will not be called on stateful session [" + ssid + "] in thread [" + threadName + "] - halt() was already called", npe);
+ LOGGER.warn("fireUntilHalt() not called on stateful session [" + ssid + "] in thread [" + threadName + "] - already halt()ed and dispose()d: " + npe.getMessage());
} finally {
thread.setContextClassLoader(origClassLoader);
}
}
});
- thread.setName(threadName);
- thread.setDaemon(true);
- thread.start();
+ statefulFireUntilHaltThread.setName(threadName);
+ statefulFireUntilHaltThread.setDaemon(true);
+ statefulFireUntilHaltThread.start();
}
else if (LOGGER.isDebugEnabled())
{
@@ -423,27 +450,44 @@
if (dispose)
{
final StatefulKnowledgeSession disposedStatefulSession = statefulSession;
+ final Thread haltedStatefulFireUntilHaltThread = statefulFireUntilHaltThread;
final KnowledgeRuntimeLogger closedStatefulRuntimeLogger = statefulRuntimeLogger;
statefulSession = null;
+ statefulFireUntilHaltThread = null;
statefulRuntimeLogger = null;
- if (fireUntilHalt)
+ // Maybe halt the session
+ if (haltedStatefulFireUntilHaltThread != null)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("calling halt() on stateful session [" + ssid + "]");
}
disposedStatefulSession.halt();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("joining thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]");
+ }
+ try
+ {
+ haltedStatefulFireUntilHaltThread.join();
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.error("interrupted thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]", ie);
+ }
}
+ // Always dispose the session
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("calling dispose() on stateful session [" + ssid + "]");
}
disposedStatefulSession.dispose();
+ // Maybe close the logger
if (closedStatefulRuntimeLogger != null)
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("calling close() on runtime logger [" + generateId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
+ LOGGER.debug("calling close() on runtime logger [" + getId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
}
closedStatefulRuntimeLogger.close();
}
@@ -462,7 +506,7 @@
statefulSessionCount--;
if (disposed && statefulSessionCount == 0)
{
- dispose(fireUntilHalt);
+ dispose();
}
statefulSessionCountLock.unlock();
}
@@ -471,41 +515,52 @@
void dispose()
{
- // in cases where we're not sure, pass "true" to be safe
- dispose(true);
- }
-
- void dispose(boolean fireUntilHalt)
- {
statefulSessionCountLock.lock();
try
{
disposed = true;
if ((statefulSessionCount == 0) && (statefulSession != null))
{
- final String ssid = generateId(statefulSession, statefulSessionCount);
+ final String ssid = getId(statefulSession, statefulSessionCount);
final StatefulKnowledgeSession disposedStatefulSession = statefulSession;
+ final Thread haltedStatefulFireUntilHaltThread = statefulFireUntilHaltThread;
final KnowledgeRuntimeLogger closedStatefulRuntimeLogger = statefulRuntimeLogger;
statefulSession = null;
+ statefulFireUntilHaltThread = null;
statefulRuntimeLogger = null;
- if (fireUntilHalt)
+ // Maybe halt the session
+ if (haltedStatefulFireUntilHaltThread != null)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("calling halt() on stateful session [" + ssid + "]");
}
disposedStatefulSession.halt();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("joining thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]");
+ }
+ try
+ {
+ haltedStatefulFireUntilHaltThread.join();
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.error("interrupted thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]", ie);
+ }
}
+ // Always dispose the session
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("calling dispose() on stateful session [" + ssid + "]");
}
disposedStatefulSession.dispose();
+ // Maybe close the logger
if (closedStatefulRuntimeLogger != null)
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("calling close() on runtime logger [" + generateId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
+ LOGGER.debug("calling close() on runtime logger [" + getId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
}
closedStatefulRuntimeLogger.close();
}
@@ -543,7 +598,7 @@
{
String channel_name = channel_entry.getKey();
Channel channel = channel_entry.getValue();
- if (channel_name != null && channel_entry != null)
+ if (channel_name != null && channel != null)
{
session.registerChannel(channel_name, channel);
}
@@ -586,15 +641,15 @@
return null;
}
- private final String generateId(final Object object)
+ private final String getId(final Object object)
{
- return generateId(object, 0);
+ return String.valueOf(System.identityHashCode(object));
}
- private final String generateId(final Object object, final int count)
+ private final String getId(final Object object, final int count)
{
return new StringBuilder()
- .append(System.identityHashCode(object))
+ .append(getId(object))
.append(":")
.append(count)
.toString();
Modified: labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java 2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java 2010-07-28 20:34:34 UTC (rev 34268)
@@ -21,8 +21,6 @@
*/
package org.jboss.internal.soa.esb.services.rules;
-import static org.jboss.internal.soa.esb.services.rules.RuleServiceCallHelper.isFireUntilHalt;
-
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -138,7 +136,7 @@
*/
public Message executeStatefulRules(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
{
- AssertArgument.isNotNull(ruleInfo, "ruleInfo" );
+ AssertArgument.isNotNull(ruleInfo, "ruleInfo");
final DroolsRuleBaseState ruleBaseState = getRuleBaseStateForFileBasedRules(ruleInfo);
return ruleBaseState.executeStatefulRules(ruleInfo, msg);
}
@@ -154,16 +152,17 @@
*/
public Message executeStatefulRulesFromDecisionTable(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
{
+ AssertArgument.isNotNull(ruleInfo, "ruleInfo");
final DroolsRuleBaseState ruleBaseState = getRuleBaseStateForDecisionTable(ruleInfo);
return ruleBaseState.executeStatefulRules(ruleInfo, msg);
}
public Message executeStatefulRulesFromRuleAgent(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
{
+ AssertArgument.isNotNull(ruleInfo, "ruleInfo");
try
{
final DroolsRuleBaseState ruleBaseState = getRuleBaseStateForRuleAgent(ruleInfo) ;
-
return ruleBaseState.executeStatefulRules(ruleInfo, msg);
}
catch (RuleServiceException e)
@@ -180,11 +179,11 @@
public Message continueStatefulRulesExecution(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
{
+ AssertArgument.isNotNull(ruleInfo, "ruleInfo");
try
{
final Map<String, DroolsRuleBaseState> ruleBaseStates = getCachedRuleBaseStates();
final DroolsRuleBaseState ruleBaseState = ruleBaseStates.get(ruleInfo.getRuleSource());
-
return ruleBaseState.executeStatefulRules(ruleInfo, msg);
}
catch (Exception e)
@@ -232,7 +231,7 @@
final DroolsRuleBaseState ruleBaseState = getCachedRuleBaseStates().remove(ruleSet);
if (ruleBaseState != null)
{
- ruleBaseState.dispose(isFireUntilHalt(ruleInfo)) ;
+ ruleBaseState.dispose() ;
}
}
}
@@ -296,7 +295,7 @@
final DroolsRuleBaseState origRuleBasedState = ruleBaseStates.put( decisionTable, ruleBaseState );
if (origRuleBasedState != null)
{
- origRuleBasedState.dispose(isFireUntilHalt(ruleInfo)) ;
+ origRuleBasedState.dispose() ;
}
}
}
@@ -358,7 +357,7 @@
{
if (ruleBaseState != null)
{
- ruleBaseState.dispose(isFireUntilHalt(ruleInfo)) ;
+ ruleBaseState.dispose() ;
}
ruleBaseState = new DroolsRuleBaseState(currentRuleBase) ;
ruleBaseStates.put(ruleAgentProperties, ruleBaseState) ;
More information about the jboss-svn-commits
mailing list