[jboss-svn-commits] JBL Code SVN: r7343 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 2 14:51:24 EST 2006


Author: jokum
Date: 2006-11-02 14:51:22 -0500 (Thu, 02 Nov 2006)
New Revision: 7343

Modified:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Log:
AbstractListenerUnitTest was failing after refactoring to thread pool

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 19:06:18 UTC (rev 7342)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 19:51:22 UTC (rev 7343)
@@ -65,12 +65,6 @@
     protected ActionDefinitionFactory m_oActionDefinitionFactory;
     protected MessageFactory m_oMsgFactory;
 
-	/* The name of the attribute in the configuration containing the number of threads in the pool*/
-	public static final String PARM_MAX_THREADS = "maxThreads";
-
-	/* The thread pool executing the action processing pipeline*/
-	protected ExecutorService pipelineExecutorPool = null;
-
     protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
         
         logger 		= Logger.getLogger(this.getClass());
@@ -83,21 +77,11 @@
         String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
         if(!sAtt.trim().equals("")) {
         	m_oActions 	= sAtt.split(",");
-        }    
+        }
         
-        //The number of threads configured
-		String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
-		if (maxThreads != null) {
-			int iMax = Integer.parseInt(maxThreads);
-			this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-			if (this.logger.isInfoEnabled()){
-				this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
-			}			
-		} else {
-			this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
-		}
-		this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
-        
+        sAtt		= GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+        int iMax 	= Integer.parseInt(sAtt);
+        m_iMaxThr 	= Math.min(iMax, m_iUpperThreadLimit);
     } // __________________________________
 
     /**
@@ -113,10 +97,21 @@
             	try { Thread.sleep(500); }
             	catch(InterruptedException e) {/*  ok  do nothing  */}
             } else {
-              for (Object currentObj : processList) {              
+              for (Object currentObj : processList) {
+                if (m_iQthr >= m_iMaxThr) {
+                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+                    try {
+                        Thread.sleep(m_iSleepForThreads);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                    break;
+                }
+
                 // Spawn a thread and push the message message through the pipeline...
                 ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
-                this.pipelineExecutorPool.submit(runner);
+                new Thread(runner).start();
+                incThreads();
               }
             }
         }
@@ -164,9 +159,23 @@
      * <p/>
      * Allows the listener to perform relevant close/cleanup tasks.
      */
-    protected abstract void close();      
+    protected abstract void close();
     
     /**
+     * Increment the active thread count.
+     */
+    private void incThreads() {
+        m_iQthr++;
+    }
+
+    /**
+     * Decrement the active thread count.
+     */
+    private void decThreads() {
+        m_iQthr--;
+    }
+    
+    /**
      * Action Processing Pipeline.
      * <p/>
      * Runs the actions in a listeners "actions" config on a message payload message received
@@ -246,7 +255,10 @@
             } catch(Throwable thrown) {
                 processingError(initialObject, currentProcessor, thrown);
                 logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            } 
+            } finally {
+	            // Decrement the active thread count for the listener on completion...
+	            decThreads();
+            }
         }
 
 		/**
@@ -280,4 +292,4 @@
 		}
     }
     
-} // ____________________________________________________________________________
+}

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 19:06:18 UTC (rev 7342)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 19:51:22 UTC (rev 7343)
@@ -68,7 +68,7 @@
 		}
 		//Start the action processing pipeline
 		ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
-		this.pipelineExecutorPool.submit(pipelineRunner);
+		new Thread(pipelineRunner).start();
 		
 		return payload;
 	}




More information about the jboss-svn-commits mailing list