[jboss-svn-commits] JBL Code SVN: r7327 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 2 10:02:50 EST 2006


Author: jokum
Date: 2006-11-02 10:02:49 -0500 (Thu, 02 Nov 2006)
New Revision: 7327

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
Log:
HttpListener added to old architecture

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 14:25:35 UTC (rev 7326)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 15:02:49 UTC (rev 7327)
@@ -23,6 +23,8 @@
 package org.jboss.soa.esb.listeners.old;
 
 import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.actions.ActionDefinition;
@@ -63,6 +65,12 @@
     protected ActionDefinitionFactory m_oActionDefinitionFactory;
     protected MessageFactory m_oMsgFactory;
 
+	/* The name of the attribute in the configuration containing the number of threads in the pool*/
+	public static final String PARM_MAX_THREADS = "maxThreads";
+
+	/* The thread pool executing the action processing pipeline*/
+	protected ExecutorService pipelineExecutorPool = null;
+
     protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
         
         logger 		= Logger.getLogger(this.getClass());
@@ -75,11 +83,21 @@
         String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
         if(!sAtt.trim().equals("")) {
         	m_oActions 	= sAtt.split(",");
-        }
+        }    
         
-        sAtt		= GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
-        int iMax 	= Integer.parseInt(sAtt);
-        m_iMaxThr 	= Math.min(iMax, m_iUpperThreadLimit);
+        //The number of threads configured
+		String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+		if (maxThreads != null) {
+			int iMax = Integer.parseInt(maxThreads);
+			this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+			if (this.logger.isInfoEnabled()){
+				this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
+			}			
+		} else {
+			this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
+		}
+		this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
+        
     } // __________________________________
 
     /**
@@ -95,21 +113,10 @@
             	try { Thread.sleep(500); }
             	catch(InterruptedException e) {/*  ok  do nothing  */}
             } else {
-              for (Object currentObj : processList) {
-                if (m_iQthr >= m_iMaxThr) {
-                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
-                    try {
-                        Thread.sleep(m_iSleepForThreads);
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                    break;
-                }
-
+              for (Object currentObj : processList) {              
                 // Spawn a thread and push the message message through the pipeline...
                 ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
-                new Thread(runner).start();
-                incThreads();
+                this.pipelineExecutorPool.submit(runner);
               }
             }
         }
@@ -157,23 +164,9 @@
      * <p/>
      * Allows the listener to perform relevant close/cleanup tasks.
      */
-    protected abstract void close();
+    protected abstract void close();      
     
     /**
-     * Increment the active thread count.
-     */
-    private void incThreads() {
-        m_iQthr++;
-    }
-
-    /**
-     * Decrement the active thread count.
-     */
-    private void decThreads() {
-        m_iQthr--;
-    }
-    
-    /**
      * Action Processing Pipeline.
      * <p/>
      * Runs the actions in a listeners "actions" config on a message payload message received
@@ -182,7 +175,7 @@
      * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
      * @since Version 4.0
      */
-    private class ActionProcessingPipeline implements Runnable {
+    protected class ActionProcessingPipeline implements Runnable {
         
 		private Object initialObject;
              
@@ -190,7 +183,7 @@
          * Private constructor.
          * @param pMessage The inital processing target message.
          */
-        private ActionProcessingPipeline(Object obj) {
+        protected ActionProcessingPipeline(Object obj) {
             initialObject = obj;
         }
 
@@ -253,10 +246,7 @@
             } catch(Throwable thrown) {
                 processingError(initialObject, currentProcessor, thrown);
                 logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            } finally {
-	            // Decrement the active thread count for the listener on completion...
-	            decThreads();
-            }
+            } 
         }
 
 		/**




More information about the jboss-svn-commits mailing list