[jboss-svn-commits] JBL Code SVN: r7327 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 2 10:02:50 EST 2006
Author: jokum
Date: 2006-11-02 10:02:49 -0500 (Thu, 02 Nov 2006)
New Revision: 7327
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
Log:
HttpListener added to old architecture
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 14:25:35 UTC (rev 7326)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 15:02:49 UTC (rev 7327)
@@ -23,6 +23,8 @@
package org.jboss.soa.esb.listeners.old;
import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.actions.ActionDefinition;
@@ -63,6 +65,12 @@
protected ActionDefinitionFactory m_oActionDefinitionFactory;
protected MessageFactory m_oMsgFactory;
+ /* The name of the attribute in the configuration containing the number of threads in the pool*/
+ public static final String PARM_MAX_THREADS = "maxThreads";
+
+ /* The thread pool executing the action processing pipeline*/
+ protected ExecutorService pipelineExecutorPool = null;
+
protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
logger = Logger.getLogger(this.getClass());
@@ -75,11 +83,21 @@
String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
if(!sAtt.trim().equals("")) {
m_oActions = sAtt.split(",");
- }
+ }
- sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ //The number of threads configured
+ String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+ if (maxThreads != null) {
+ int iMax = Integer.parseInt(maxThreads);
+ this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ if (this.logger.isInfoEnabled()){
+ this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
+ }
+ } else {
+ this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
+ }
+ this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
+
} // __________________________________
/**
@@ -95,21 +113,10 @@
try { Thread.sleep(500); }
catch(InterruptedException e) {/* ok do nothing */}
} else {
- for (Object currentObj : processList) {
- if (m_iQthr >= m_iMaxThr) {
- logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
- try {
- Thread.sleep(m_iSleepForThreads);
- } catch (InterruptedException e) {
- return;
- }
- break;
- }
-
+ for (Object currentObj : processList) {
// Spawn a thread and push the message message through the pipeline...
ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
- new Thread(runner).start();
- incThreads();
+ this.pipelineExecutorPool.submit(runner);
}
}
}
@@ -157,23 +164,9 @@
* <p/>
* Allows the listener to perform relevant close/cleanup tasks.
*/
- protected abstract void close();
+ protected abstract void close();
/**
- * Increment the active thread count.
- */
- private void incThreads() {
- m_iQthr++;
- }
-
- /**
- * Decrement the active thread count.
- */
- private void decThreads() {
- m_iQthr--;
- }
-
- /**
* Action Processing Pipeline.
* <p/>
* Runs the actions in a listeners "actions" config on a message payload message received
@@ -182,7 +175,7 @@
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
* @since Version 4.0
*/
- private class ActionProcessingPipeline implements Runnable {
+ protected class ActionProcessingPipeline implements Runnable {
private Object initialObject;
@@ -190,7 +183,7 @@
* Private constructor.
* @param pMessage The inital processing target message.
*/
- private ActionProcessingPipeline(Object obj) {
+ protected ActionProcessingPipeline(Object obj) {
initialObject = obj;
}
@@ -253,10 +246,7 @@
} catch(Throwable thrown) {
processingError(initialObject, currentProcessor, thrown);
logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "]. Action [" + currentAction + "] threw an exception.", thrown);
- } finally {
- // Decrement the active thread count for the listener on completion...
- decThreads();
- }
+ }
}
/**
More information about the jboss-svn-commits
mailing list