[jboss-svn-commits] JBL Code SVN: r7324 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 2 09:24:26 EST 2006


Author: jokum
Date: 2006-11-02 09:24:23 -0500 (Thu, 02 Nov 2006)
New Revision: 7324

Added:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Modified:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
Log:
HttpListener added to old architecture

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 14:07:16 UTC (rev 7323)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 14:24:23 UTC (rev 7324)
@@ -23,6 +23,8 @@
 package org.jboss.soa.esb.listeners.old;
 
 import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.actions.ActionDefinition;
@@ -63,6 +65,12 @@
     protected ActionDefinitionFactory m_oActionDefinitionFactory;
     protected MessageFactory m_oMsgFactory;
 
+	/* The name of the attribute in the configuration containing the number of threads in the pool*/
+	public static final String PARM_MAX_THREADS = "maxThreads";
+
+	/* The thread pool executing the action processing pipeline*/
+	protected ExecutorService pipelineExecutorPool = null;
+
     protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
         
         logger 		= Logger.getLogger(this.getClass());
@@ -75,11 +83,21 @@
         String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
         if(!sAtt.trim().equals("")) {
         	m_oActions 	= sAtt.split(",");
-        }
+        }    
         
-        sAtt		= GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
-        int iMax 	= Integer.parseInt(sAtt);
-        m_iMaxThr 	= Math.min(iMax, m_iUpperThreadLimit);
+        //The number of threads configured
+		String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+		if (maxThreads != null) {
+			int iMax = Integer.parseInt(maxThreads);
+			this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+			if (this.logger.isInfoEnabled()){
+				this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
+			}			
+		} else {
+			this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
+		}
+		this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
+        
     } // __________________________________
 
     /**
@@ -95,21 +113,10 @@
             	try { Thread.sleep(500); }
             	catch(InterruptedException e) {/*  ok  do nothing  */}
             } else {
-              for (Object currentObj : processList) {
-                if (m_iQthr >= m_iMaxThr) {
-                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
-                    try {
-                        Thread.sleep(m_iSleepForThreads);
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                    break;
-                }
-
+              for (Object currentObj : processList) {              
                 // Spawn a thread and push the message message through the pipeline...
                 ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
-                new Thread(runner).start();
-                incThreads();
+                this.pipelineExecutorPool.submit(runner);
               }
             }
         }
@@ -157,23 +164,9 @@
      * <p/>
      * Allows the listener to perform relevant close/cleanup tasks.
      */
-    protected abstract void close();
+    protected abstract void close();      
     
     /**
-     * Increment the active thread count.
-     */
-    private void incThreads() {
-        m_iQthr++;
-    }
-
-    /**
-     * Decrement the active thread count.
-     */
-    private void decThreads() {
-        m_iQthr--;
-    }
-    
-    /**
      * Action Processing Pipeline.
      * <p/>
      * Runs the actions in a listeners "actions" config on a message payload message received
@@ -182,7 +175,7 @@
      * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
      * @since Version 4.0
      */
-    private class ActionProcessingPipeline implements Runnable {
+    protected class ActionProcessingPipeline implements Runnable {
         
 		private Object initialObject;
              
@@ -190,7 +183,7 @@
          * Private constructor.
          * @param pMessage The inital processing target message.
          */
-        private ActionProcessingPipeline(Object obj) {
+        protected ActionProcessingPipeline(Object obj) {
             initialObject = obj;
         }
 
@@ -253,10 +246,7 @@
             } catch(Throwable thrown) {
                 processingError(initialObject, currentProcessor, thrown);
                 logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            } finally {
-	            // Decrement the active thread count for the listener on completion...
-	            decThreads();
-            }
+            } 
         }
 
 		/**

Added: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java	2006-11-02 14:07:16 UTC (rev 7323)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java	2006-11-02 14:24:23 UTC (rev 7324)
@@ -0,0 +1,28 @@
+package org.jboss.soa.esb.listeners.old;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+/**
+ * Base class to be implmented by listener implementations which use a channel
+ * implementation doing the listening stuff like periodically receiving on a
+ * queue.
+ * 
+ * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
+ * 
+ */
+public abstract class AbstractPassiveListener extends AbstractListener {
+
+	protected AbstractPassiveListener(GpListener p_oDad, ConfigTree p_oParms,
+			ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+		super(p_oDad, p_oParms, actionDefinitionFactory);
+	}
+
+	@Override
+	protected Object[] receive() {
+		// nothing to be done here because channel implementation is taking care
+		// of blocking receive stuff
+		return null;
+	}
+
+}

Added: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 14:07:16 UTC (rev 7323)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 14:24:23 UTC (rev 7324)
@@ -0,0 +1,143 @@
+package org.jboss.soa.esb.listeners.old;
+
+import javax.management.MBeanServer;
+
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+/**
+ * 
+ * 
+ * @author Johan Kumps
+ * 
+ */
+public class HttpListener extends AbstractPassiveListener implements ServerInvocationHandler{
+
+	/* The url to listen on */
+	public String listenHttpUrl = null;
+
+	/* The url this listener will listen on */
+	private static final String LISTEN_HTTP_URL = "listenHttpURL";
+
+	/* The default transport this listener will listen on */
+	private static final String transport = "http";
+
+	/* The default hostname this listener will listen on */
+	private static final String host = "localhost";
+
+	/* The default port this listener will listen on */
+	private static final int port = 5400;
+
+	public HttpListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
+	  {
+		super(p_oDad, p_oParms, actionDefinitionFactory);
+		this.listenHttpUrl = GpListener.obtainAtt(p_oParms,LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
+		// initialize the HTTP server
+		this.initServer();
+	  }
+
+	/**
+	 * Check for mandatory and optional attributes in parameter tree
+	 * 
+	 * @throws Exception -
+	 *             if actionClass not specified or not in classpath or invalid
+	 *             int values for maxThreads or pollLatencySecs
+	 * 
+	 */
+	protected void checkParams() throws Exception {
+		// listener url
+		this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig, LISTEN_HTTP_URL,
+				this.getDefaultListenHttpUrl());
+	}
+	
+	public Object invoke(InvocationRequest invocationRequest) throws Throwable {
+		//Retrieving the real payload of this invocationRequest
+		Object payload = invocationRequest.getParameter();
+
+		if (this.logger.isInfoEnabled()) {
+			this.logger
+					.info("HttpInvocationListener is invoked...The given payload is : "
+							+ payload);
+		}
+		//Start the action processing pipeline
+		ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
+		this.pipelineExecutorPool.submit(pipelineRunner);
+		
+		return payload;
+	}
+
+	/**
+	 * Method returning the default listenHttpUrl for this HttpListener instance
+	 * 
+	 * @return the default listen url
+	 */
+	private String getDefaultListenHttpUrl() {
+		return HttpListener.transport + "://" + HttpListener.host + ":"
+				+ HttpListener.port;
+	}
+
+	private void initServer() throws Exception {
+		InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
+		if (this.logger.isInfoEnabled()) {
+			this.logger.info("Starting remoting server with locator uri of: "
+					+ this.listenHttpUrl);
+		}
+		Connector connector = new Connector(locator);
+		connector.create();
+		connector.addInvocationHandler("HttpInvocationHandler", this);
+
+		// Starting the server deamon
+		connector.start();
+
+		if (this.logger.isInfoEnabled()) {
+			this.logger.info("HttpListener deamon started successfully!");
+		}		
+
+	}
+
+	@Override
+	protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void processingComplete(Object initialMsg) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void close() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void setMBeanServer(MBeanServer arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void setInvoker(ServerInvoker arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void addListener(InvokerCallbackHandler arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void removeListener(InvokerCallbackHandler arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+}




More information about the jboss-svn-commits mailing list