[jboss-svn-commits] JBL Code SVN: r34268 - in labs/jbossesb/trunk/product: services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Jul 28 16:34:35 EDT 2010


Author: dward
Date: 2010-07-28 16:34:34 -0400 (Wed, 28 Jul 2010)
New Revision: 34268

Modified:
   labs/jbossesb/trunk/product/.classpath
   labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java
   labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java
   labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java
Log:
Fix for JBESB-3397 ( https://jira.jboss.org/browse/JBESB-3397 ).


Modified: labs/jbossesb/trunk/product/.classpath
===================================================================
--- labs/jbossesb/trunk/product/.classpath	2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/.classpath	2010-07-28 20:34:34 UTC (rev 34268)
@@ -70,7 +70,7 @@
 	<classpathentry kind="lib" path="build/lib/jboss-deployers-core-spi-2.0.7.GA.jar"/>
 	<classpathentry kind="lib" path="build/lib/jboss-deployers-impl-2.0.7.GA.jar"/>
 	<classpathentry kind="lib" path="build/lib/jboss-deployers-spi-2.0.7.GA.jar"/>
-	<classpathentry kind="lib" path="build/lib/jboss-deployers-structure-spi-2.0.7.GA.jar" sourcepath="/jboss-deployers-structure-spi"/>
+	<classpathentry kind="lib" path="build/lib/jboss-deployers-structure-spi-2.0.7.GA.jar"/>
 	<classpathentry kind="lib" path="build/lib/jboss-deployers-vfs-2.0.7.GA.jar"/>
 	<classpathentry kind="lib" path="build/lib/jboss-deployers-vfs-spi-2.0.7.GA.jar"/>
 	<classpathentry kind="lib" path="build/lib/jboss-kernel-2.0.6.GA.jar"/>

Modified: labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java	2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/routing/cbr/JBRulesCounter.java	2010-07-28 20:34:34 UTC (rev 34268)
@@ -322,7 +322,7 @@
 				count = actionCounterHash.get(messageProfile + " " + MESSAGE_COUNTER);
 				count = count.intValue() + 1;
 			} else {
-				count = new Integer(1);
+				count = Integer.valueOf(1);
 			}
 			actionCounterHash.put(messageProfile + " " + MESSAGE_COUNTER, count);
 			
@@ -339,7 +339,7 @@
 				count = actionFailedCounterHash.get(messageProfile + " " + FAILED_MESSAGE_COUNTER);
 				count = count.intValue() + 1; 
 			} else {
-				count = new Integer(1);
+				count = Integer.valueOf(1);
 			}
 			actionFailedCounterHash.put(messageProfile + " " + FAILED_MESSAGE_COUNTER, count);
 		}		

Modified: labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java	2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleBaseState.java	2010-07-28 20:34:34 UTC (rev 34268)
@@ -97,6 +97,10 @@
 	 */
 	private KnowledgeRuntimeLogger statefulRuntimeLogger;
 	/**
+	 * The stateful fireUntilHalt thread.
+	 */
+	private Thread statefulFireUntilHaltThread;
+	/**
 	 * The stateful session lock.
 	 */
 	private final Lock statefulSessionLock = new ReentrantLock();
@@ -152,7 +156,7 @@
 		if (head != null)
 		{
 			statelessSession = head;
-			sid = generateId(statelessSession);
+			sid = getId(statelessSession);
 			if (LOGGER.isDebugEnabled())
 			{
 				LOGGER.debug("reusing old stateles session [" + sid + "]");
@@ -161,7 +165,7 @@
 		else
 		{
 			statelessSession = ruleBase.newStatelessKnowledgeSession();
-			sid = generateId(statelessSession);
+			sid = getId(statelessSession);
 			if (LOGGER.isDebugEnabled())
 			{
 				LOGGER.debug("created new stateless session [" + sid + "]");
@@ -187,7 +191,7 @@
 			KnowledgeRuntimeLogger statelessRuntimeLogger = getRuntimeLogger(ruleInfo, statelessSession);
 			if (statelessRuntimeLogger != null && LOGGER.isDebugEnabled())
 			{
-				LOGGER.debug("created new runtime logger [" + generateId(statelessRuntimeLogger) + "]");
+				LOGGER.debug("created new runtime logger [" + getId(statelessRuntimeLogger) + "]");
 			}
 			if (LOGGER.isTraceEnabled())
 			{
@@ -209,7 +213,7 @@
 				{
 					if (LOGGER.isDebugEnabled())
 					{
-						LOGGER.debug("calling close() on runtime logger [" + generateId(statelessRuntimeLogger) + "]");
+						LOGGER.debug("calling close() on runtime logger [" + getId(statelessRuntimeLogger) + "]");
 					}
 					statelessRuntimeLogger.close();
 				}
@@ -251,29 +255,46 @@
 			{
 				if (statefulSession != null && !continueState)
 				{
-					final String ssid = generateId(statefulSession, statefulSessionCount);
+					final String ssid = getId(statefulSession, statefulSessionCount);
 					final StatefulKnowledgeSession disposedStatefulSession = statefulSession;
+					final Thread haltedStatefulFireUntilHaltThread = statefulFireUntilHaltThread;
 					final KnowledgeRuntimeLogger closedStatefulRuntimeLogger = statefulRuntimeLogger;
 					statefulSession = null;
+					statefulFireUntilHaltThread = null;
 					statefulRuntimeLogger = null;
-					if (fireUntilHalt)
+					// Maybe halt the session
+					if (haltedStatefulFireUntilHaltThread != null)
 					{
 						if (LOGGER.isDebugEnabled())
 						{
 							LOGGER.debug("calling halt on stateful session [" + ssid + "] - no continue set");
 						}
 						disposedStatefulSession.halt();
+						if (LOGGER.isDebugEnabled())
+						{
+							LOGGER.debug("joining thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]");
+						}
+						try
+						{
+							haltedStatefulFireUntilHaltThread.join();
+						}
+						catch (InterruptedException ie)
+						{
+							LOGGER.error("interrupted thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]", ie);
+						}
 					}
+					// Always dispose the session
 					if (LOGGER.isDebugEnabled())
 					{
 						LOGGER.debug("calling dispose() on stateful session [" + ssid + "] - no continue set");
 					}
 					disposedStatefulSession.dispose();
+					// Maybe close the logger
 					if (closedStatefulRuntimeLogger != null)
 					{
 						if (LOGGER.isDebugEnabled())
 						{
-							LOGGER.debug("calling close() on runtime logger [" + generateId(closedStatefulRuntimeLogger, statefulSessionCount) + "] - no continue set");
+							LOGGER.debug("calling close() on runtime logger [" + getId(closedStatefulRuntimeLogger, statefulSessionCount) + "] - no continue set");
 						}
 						closedStatefulRuntimeLogger.close();
 					}
@@ -287,7 +308,7 @@
 					KnowledgeSessionConfiguration statefulSessionConfiguration = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
 					setClockType(ruleInfo, statefulSessionConfiguration);
 					statefulSession = ruleBase.newStatefulKnowledgeSession(statefulSessionConfiguration, EnvironmentFactory.newEnvironment());
-					ssid = generateId(statefulSession, statefulSessionCount);
+					ssid = getId(statefulSession, statefulSessionCount);
 					if (LOGGER.isDebugEnabled())
 					{
 						LOGGER.debug("created new stateful session [" + ssid + "]");
@@ -296,13 +317,13 @@
 					statefulRuntimeLogger = getRuntimeLogger(ruleInfo, statefulSession);
 					if (statefulRuntimeLogger != null && LOGGER.isDebugEnabled())
 					{
-						LOGGER.debug("created new runtime logger [" + generateId(statefulRuntimeLogger, statefulSessionCount) + "]");
+						LOGGER.debug("created new runtime logger [" + getId(statefulRuntimeLogger, statefulSessionCount) + "]");
 					}
 				}
 				else
 				{
 					isStatefulSessionNew = false;
-					ssid = generateId(statefulSession, statefulSessionCount);
+					ssid = getId(statefulSession, statefulSessionCount);
 					if (LOGGER.isDebugEnabled())
 					{
 						LOGGER.debug("reusing old stateful session [" + ssid + "]");
@@ -337,7 +358,7 @@
 						LOGGER.debug("event [" + ef.getObject().getClass().getName() + "], startTimeStamp [" + ef.getStartTimestamp() + "]");
 					}
 
-					// Always insert the Default facts
+					// Always insert the default facts (into the main WorkingMemory; no entry-point specified)
 					final List<Object> defaultFacts = ruleInfo.getDefaultFacts();
 					if (defaultFacts != null)
 					{
@@ -351,14 +372,15 @@
 						}
 					}
 
-					// Insert any object that did not specify an entry-point into the main working memory.
-					if (ruleInfo.getFacts() != null)
+					// Maybe insert entry point facts (into a named WorkingMemoryEntryPoint)
+					final Map<String,List<Object>> facts = ruleInfo.getFacts();
+					if (facts != null)
 					{
 						if (LOGGER.isTraceEnabled())
 						{
-							LOGGER.trace("calling insert(Object) on stateful session [" + ssid + "] for each fact");
+							LOGGER.trace("calling insert(Object) on stateful session [" + ssid + "] for each entry point fact");
 						}
-						for(Entry<String, List<Object>> entry : ruleInfo.getFacts().entrySet())
+						for(Entry<String, List<Object>> entry : facts.entrySet())
 						{
 							String entryPointName = entry.getKey();
 							// Insert objects that have explicitly specified an entry-point.
@@ -367,9 +389,9 @@
 							{
 								throw new RuleServiceException("The entry-point '" + entryPointName + "' was not found in the current stateful session. Please double check your rules source");
 							}
-							for(Object object : entry.getValue())
+							for(Object fact : entry.getValue())
 							{
-								wmep.insert(object);
+								wmep.insert(fact);
 							}
 						}
 					}
@@ -387,31 +409,36 @@
 					{
 						final String threadName = new StringBuilder()
 							.append(getClass().getSimpleName())
-							.append(":fireUntilHalt[")
-							.append(ssid).append("]")
+							.append(":fireUntilHalt(")
+							.append(ssid)
+							.append(")")
 							.toString();
 						if (LOGGER.isDebugEnabled())
 						{
-							LOGGER.debug("calling fireUntilHalt() on stateful session [" + ssid + "] in new thread [" + threadName + "]");
+							LOGGER.debug("spawning fireUntilHalt() on stateful session [" + ssid + "] in thread [" + threadName + "]");
 						}
 						final ClassLoader goodClassLoader = Thread.currentThread().getContextClassLoader();
-						final Thread thread = new Thread(new Runnable() {
+						statefulFireUntilHaltThread = new Thread(new Runnable() {
 							public void run() {
 								Thread thread = Thread.currentThread();
 								ClassLoader origClassLoader = thread.getContextClassLoader();
 								thread.setContextClassLoader(goodClassLoader);
 								try {
+									if (LOGGER.isDebugEnabled())
+									{
+										LOGGER.debug("calling fireUntilHalt() on stateful session [" + ssid + "] in thread [" + threadName + "]");
+									}
 									statefulSession.fireUntilHalt();
 								} catch (NullPointerException npe) {
-									LOGGER.warn("fireUntilHalt() will not be called on stateful session [" + ssid + "] in thread [" + threadName + "] - halt() was already called", npe);
+									LOGGER.warn("fireUntilHalt() not called on stateful session [" + ssid + "] in thread [" + threadName + "] - already halt()ed and dispose()d: " + npe.getMessage());
 								} finally {
 									thread.setContextClassLoader(origClassLoader);
 								}
 							}
 						});
-						thread.setName(threadName);
-						thread.setDaemon(true);
-						thread.start();
+						statefulFireUntilHaltThread.setName(threadName);
+						statefulFireUntilHaltThread.setDaemon(true);
+						statefulFireUntilHaltThread.start();
 					}
 					else if (LOGGER.isDebugEnabled())
 					{
@@ -423,27 +450,44 @@
 					if (dispose)
 					{
 						final StatefulKnowledgeSession disposedStatefulSession = statefulSession;
+						final Thread haltedStatefulFireUntilHaltThread = statefulFireUntilHaltThread;
 						final KnowledgeRuntimeLogger closedStatefulRuntimeLogger = statefulRuntimeLogger;
 						statefulSession = null;
+						statefulFireUntilHaltThread = null;
 						statefulRuntimeLogger = null;
-						if (fireUntilHalt)
+						// Maybe halt the session
+						if (haltedStatefulFireUntilHaltThread != null)
 						{
 							if (LOGGER.isDebugEnabled())
 							{
 								LOGGER.debug("calling halt() on stateful session [" + ssid + "]");
 							}
 							disposedStatefulSession.halt();
+							if (LOGGER.isDebugEnabled())
+							{
+								LOGGER.debug("joining thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]");
+							}
+							try
+							{
+								haltedStatefulFireUntilHaltThread.join();
+							}
+							catch (InterruptedException ie)
+							{
+								LOGGER.error("interrupted thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]", ie);
+							}
 						}
+						// Always dispose the session
 						if (LOGGER.isDebugEnabled())
 						{
 							LOGGER.debug("calling dispose() on stateful session [" + ssid + "]");
 						}
 						disposedStatefulSession.dispose();
+						// Maybe close the logger
 						if (closedStatefulRuntimeLogger != null)
 						{
 							if (LOGGER.isDebugEnabled())
 							{
-								LOGGER.debug("calling close() on runtime logger [" + generateId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
+								LOGGER.debug("calling close() on runtime logger [" + getId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
 							}
 							closedStatefulRuntimeLogger.close();
 						}
@@ -462,7 +506,7 @@
 			statefulSessionCount--;
 			if (disposed && statefulSessionCount == 0)
 			{
-				dispose(fireUntilHalt);
+				dispose();
 			}
 			statefulSessionCountLock.unlock();
 		}
@@ -471,41 +515,52 @@
 
 	void dispose()
 	{
-		// in cases where we're not sure, pass "true" to be safe
-		dispose(true);
-	}
-
-	void dispose(boolean fireUntilHalt)
-	{
 		statefulSessionCountLock.lock();
 		try
 		{
 			disposed = true;
 			if ((statefulSessionCount == 0) && (statefulSession != null))
 			{
-				final String ssid = generateId(statefulSession, statefulSessionCount);
+				final String ssid = getId(statefulSession, statefulSessionCount);
 				final StatefulKnowledgeSession disposedStatefulSession = statefulSession;
+				final Thread haltedStatefulFireUntilHaltThread = statefulFireUntilHaltThread;
 				final KnowledgeRuntimeLogger closedStatefulRuntimeLogger = statefulRuntimeLogger;
 				statefulSession = null;
+				statefulFireUntilHaltThread = null;
 				statefulRuntimeLogger = null;
-				if (fireUntilHalt)
+				// Maybe halt the session
+				if (haltedStatefulFireUntilHaltThread != null)
 				{
 					if (LOGGER.isDebugEnabled())
 					{
 						LOGGER.debug("calling halt() on stateful session [" + ssid + "]");
 					}
 					disposedStatefulSession.halt();
+					if (LOGGER.isDebugEnabled())
+					{
+						LOGGER.debug("joining thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]");
+					}
+					try
+					{
+						haltedStatefulFireUntilHaltThread.join();
+					}
+					catch (InterruptedException ie)
+					{
+						LOGGER.error("interrupted thread [" + haltedStatefulFireUntilHaltThread.getName() + "] for stateful session [" + ssid + "]", ie);
+					}
 				}
+				// Always dispose the session
 				if (LOGGER.isDebugEnabled())
 				{
 					LOGGER.debug("calling dispose() on stateful session [" + ssid + "]");
 				}
 				disposedStatefulSession.dispose();
+				// Maybe close the logger
 				if (closedStatefulRuntimeLogger != null)
 				{
 					if (LOGGER.isDebugEnabled())
 					{
-						LOGGER.debug("calling close() on runtime logger [" + generateId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
+						LOGGER.debug("calling close() on runtime logger [" + getId(closedStatefulRuntimeLogger, statefulSessionCount) + "]");
 					}
 					closedStatefulRuntimeLogger.close();
 				}
@@ -543,7 +598,7 @@
 			{
 				String channel_name = channel_entry.getKey();
 				Channel channel = channel_entry.getValue();
-				if (channel_name != null && channel_entry != null)
+				if (channel_name != null && channel != null)
 				{
 					session.registerChannel(channel_name, channel);
 				}
@@ -586,15 +641,15 @@
 		return null;
 	}
 
-	private final String generateId(final Object object)
+	private final String getId(final Object object)
 	{
-		return generateId(object, 0);
+		return String.valueOf(System.identityHashCode(object));
 	}
 
-	private final String generateId(final Object object, final int count)
+	private final String getId(final Object object, final int count)
 	{
 		return new StringBuilder()
-			.append(System.identityHashCode(object))
+			.append(getId(object))
 			.append(":")
 			.append(count)
 			.toString();

Modified: labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java	2010-07-28 19:51:31 UTC (rev 34267)
+++ labs/jbossesb/trunk/product/services/jbrules/src/main/java/org/jboss/internal/soa/esb/services/rules/DroolsRuleService.java	2010-07-28 20:34:34 UTC (rev 34268)
@@ -21,8 +21,6 @@
  */
 package org.jboss.internal.soa.esb.services.rules;
 
-import static org.jboss.internal.soa.esb.services.rules.RuleServiceCallHelper.isFireUntilHalt;
-
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -138,7 +136,7 @@
 	 */
 	public Message executeStatefulRules(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
     {
-		AssertArgument.isNotNull(ruleInfo, "ruleInfo" );
+		AssertArgument.isNotNull(ruleInfo, "ruleInfo");
 		final DroolsRuleBaseState ruleBaseState = getRuleBaseStateForFileBasedRules(ruleInfo);
 		return ruleBaseState.executeStatefulRules(ruleInfo, msg);
     }
@@ -154,16 +152,17 @@
 	 */
     public Message executeStatefulRulesFromDecisionTable(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
     {
+    	AssertArgument.isNotNull(ruleInfo, "ruleInfo");
 		final DroolsRuleBaseState ruleBaseState = getRuleBaseStateForDecisionTable(ruleInfo);
 		return ruleBaseState.executeStatefulRules(ruleInfo, msg);
     }
 
 	public Message executeStatefulRulesFromRuleAgent(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
     {
+		AssertArgument.isNotNull(ruleInfo, "ruleInfo");
 	    try
         {
             final DroolsRuleBaseState ruleBaseState = getRuleBaseStateForRuleAgent(ruleInfo) ;
-
             return ruleBaseState.executeStatefulRules(ruleInfo, msg);
         }
         catch (RuleServiceException e)
@@ -180,11 +179,11 @@
 
     public Message continueStatefulRulesExecution(final StatefulRuleInfo ruleInfo, final Message msg) throws RuleServiceException
     {
+    	AssertArgument.isNotNull(ruleInfo, "ruleInfo");
 		try
 		{
 			final Map<String, DroolsRuleBaseState> ruleBaseStates = getCachedRuleBaseStates();
 			final DroolsRuleBaseState ruleBaseState = ruleBaseStates.get(ruleInfo.getRuleSource());
-
 			return ruleBaseState.executeStatefulRules(ruleInfo, msg);
 		}
 		catch (Exception e)
@@ -232,7 +231,7 @@
 						final DroolsRuleBaseState ruleBaseState = getCachedRuleBaseStates().remove(ruleSet);
 						if (ruleBaseState != null)
 						{
-							ruleBaseState.dispose(isFireUntilHalt(ruleInfo)) ;
+							ruleBaseState.dispose() ;
 						}
 					}
 				}
@@ -296,7 +295,7 @@
                         final DroolsRuleBaseState origRuleBasedState = ruleBaseStates.put( decisionTable, ruleBaseState );
                         if (origRuleBasedState != null)
                         {
-                            origRuleBasedState.dispose(isFireUntilHalt(ruleInfo)) ;
+                            origRuleBasedState.dispose() ;
                         }
                     }
                 }
@@ -358,7 +357,7 @@
 				{
 					if (ruleBaseState != null)
 					{
-						ruleBaseState.dispose(isFireUntilHalt(ruleInfo)) ;
+						ruleBaseState.dispose() ;
 					}
 					ruleBaseState = new DroolsRuleBaseState(currentRuleBase) ;
 					ruleBaseStates.put(ruleAgentProperties, ruleBaseState) ;



More information about the jboss-svn-commits mailing list