[jboss-svn-commits] JBL Code SVN: r7343 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 2 14:51:24 EST 2006
Author: jokum
Date: 2006-11-02 14:51:22 -0500 (Thu, 02 Nov 2006)
New Revision: 7343
Modified:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Log:
AbstractListenerUnitTest was failing after refactoring to thread pool
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 19:06:18 UTC (rev 7342)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 19:51:22 UTC (rev 7343)
@@ -65,12 +65,6 @@
protected ActionDefinitionFactory m_oActionDefinitionFactory;
protected MessageFactory m_oMsgFactory;
- /* The name of the attribute in the configuration containing the number of threads in the pool*/
- public static final String PARM_MAX_THREADS = "maxThreads";
-
- /* The thread pool executing the action processing pipeline*/
- protected ExecutorService pipelineExecutorPool = null;
-
protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
logger = Logger.getLogger(this.getClass());
@@ -83,21 +77,11 @@
String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
if(!sAtt.trim().equals("")) {
m_oActions = sAtt.split(",");
- }
+ }
- //The number of threads configured
- String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
- if (maxThreads != null) {
- int iMax = Integer.parseInt(maxThreads);
- this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
- if (this.logger.isInfoEnabled()){
- this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
- }
- } else {
- this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
- }
- this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
-
+ sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
} // __________________________________
/**
@@ -113,10 +97,21 @@
try { Thread.sleep(500); }
catch(InterruptedException e) {/* ok do nothing */}
} else {
- for (Object currentObj : processList) {
+ for (Object currentObj : processList) {
+ if (m_iQthr >= m_iMaxThr) {
+ logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+
// Spawn a thread and push the message message through the pipeline...
ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
- this.pipelineExecutorPool.submit(runner);
+ new Thread(runner).start();
+ incThreads();
}
}
}
@@ -164,9 +159,23 @@
* <p/>
* Allows the listener to perform relevant close/cleanup tasks.
*/
- protected abstract void close();
+ protected abstract void close();
/**
+ * Increment the active thread count.
+ */
+ private void incThreads() {
+ m_iQthr++;
+ }
+
+ /**
+ * Decrement the active thread count.
+ */
+ private void decThreads() {
+ m_iQthr--;
+ }
+
+ /**
* Action Processing Pipeline.
* <p/>
* Runs the actions in a listeners "actions" config on a message payload message received
@@ -246,7 +255,10 @@
} catch(Throwable thrown) {
processingError(initialObject, currentProcessor, thrown);
logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "]. Action [" + currentAction + "] threw an exception.", thrown);
- }
+ } finally {
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
+ }
}
/**
@@ -280,4 +292,4 @@
}
}
-} // ____________________________________________________________________________
+}
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 19:06:18 UTC (rev 7342)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 19:51:22 UTC (rev 7343)
@@ -68,7 +68,7 @@
}
//Start the action processing pipeline
ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
- this.pipelineExecutorPool.submit(pipelineRunner);
+ new Thread(pipelineRunner).start();
return payload;
}
More information about the jboss-svn-commits
mailing list