[jboss-svn-commits] JBL Code SVN: r6535 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Oct 2 17:25:48 EDT 2006


Author: jokum
Date: 2006-10-02 17:25:46 -0400 (Mon, 02 Oct 2006)
New Revision: 6535

Added:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/HttpInvocationListener.java
Log:
Implemented HttpListener

Added: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-02 21:24:58 UTC (rev 6534)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-02 21:25:46 UTC (rev 6535)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Base abstract message listener implementation. This is a 'passive' message
+ * listener to be used when the underlying channel implementation implements the
+ * blocking receive itself. For example if the concrete listener class is implemented using
+ * the JBoss Remoting channel implementations (For example HttpListener).
+ * 
+ * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
+ * @since Version 1.0
+ */
+public abstract class AbstractListener implements Runnable {
+	
+	 // You can override these values at constructor time of your
+    // derived class after calling super(GpListener,DomElement)
+    protected int m_iSleepForThreads = 3000; // default sleep if no threads available
+    protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+
+    protected int m_iQthr = 0, m_iMaxThr;
+
+    protected ThreadGroup m_oThrGrp = null;
+
+    protected Logger logger;
+
+    protected GpListener m_oDad;
+
+    protected DomElement listenerConfig;
+
+    protected String[] m_oActions;
+
+    protected ActionDefinitionFactory m_oActionDefinitionFactory;
+    
+    public void run(){    	
+    }
+
+	protected AbstractListener(GpListener p_oDad, DomElement p_oParms,
+			ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        logger = Logger.getLogger(this.getClass());
+        m_oDad = p_oDad;
+        listenerConfig = p_oParms.cloneObj();
+        m_oActionDefinitionFactory = actionDefinitionFactory;
+        m_oThrGrp = new ThreadGroup(listenerConfig.getName());
+
+        String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
+        m_oActions = sAtt.split(",");
+        
+        if(m_oActions.length == 0) {
+            throw new ConfigurationException("Listener 'actions' list must be specified.");
+        }
+
+        sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+        int iMax = Integer.parseInt(sAtt);
+        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+	}
+
+    /**
+     * Increment the active thread count.
+     */
+    private void incThreads() {
+        m_iQthr++;
+    }
+
+    /**
+     * Decrement the active thread count.
+     */
+    private void decThreads() {
+        m_iQthr--;
+    }
+    
+    /**
+     * Called on the listener implementation when pipeline processing error has occured.
+     * @param initialMessage The message reference that was initialy supplied to the pipeline.
+     * @param processor The processor raised the error.
+     * @param error The error.
+     */
+    protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
+
+    /**
+     * Called on the listener implementation when pipeline processing of a message is complete.
+     * @param initialMessage The message reference that was initialy supplied to the pipeline.
+     */
+    protected abstract void processingComplete(Object initialMessage);
+    
+	/**
+	 * Action Processing Pipeline controller. <p/> Delays the pipeline execution
+	 * until there is a thread available.
+	 * 
+	 * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
+	 * @since Version 1.0
+	 */
+	protected class ActionProcessingPipelineController implements Runnable {
+
+		/*
+		 * The ActionProcessingPipeline to be controlled (suspended and
+		 * activated when thera are thread available)
+		 */
+		private ActionProcessingPipeline actionProcessingPipeline = null;
+
+		protected ActionProcessingPipelineController(
+				ActionProcessingPipeline actionProcessingPipeline) {
+			this.actionProcessingPipeline = actionProcessingPipeline;
+		}
+
+		public void run() {
+			while (m_iQthr > m_iMaxThr) {
+				// There are still no threads available to execute the pipeline
+				try {
+					Thread.sleep(m_iSleepForThreads);
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			// Now there is a thread available --> start the pipeline
+			//TODO use thread pool!
+			new Thread(this.actionProcessingPipeline).start();
+		}
+	}
+	
+	/**
+     * Action Processing Pipeline.
+     * <p/>
+     * Runs the actions in a listeners "actions" config on a message payload message received
+     * by the listener implementation.
+     * 
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     * @since Version 4.0
+     */
+	protected class ActionProcessingPipeline implements Runnable {
+        
+        private Object initialMessage;
+             
+        /**
+         * Private constructor.
+         * @param initialMessage The inital processing target message.
+         */
+        protected ActionProcessingPipeline(Object initialMessage) {
+            this.initialMessage = initialMessage;
+        }
+
+        /* (non-Javadoc)
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            String currentAction = null;
+            ActionProcessor currentProcessor = null;
+            
+            // Increment the active thread count for the listener on starting...
+            incThreads();
+            
+            try {
+                Object message = initialMessage;
+
+                // Run the message through each ActionProcessor...
+                for(String action : m_oActions) {
+                    ActionDefinition actionDefinition;
+                    Object processingResult = null;
+
+                    currentAction = action.trim();
+                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+                    if(actionDefinition == null) {
+                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
+                    }
+                    
+                    // The processing result of each action feeds into the processing of the next action...
+                    currentProcessor = actionDefinition.getProcessor();
+                    try {
+                        processingResult = currentProcessor.process(message);
+                    } catch (Exception e) {
+                        GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
+                        throw e;
+                    }
+                    
+                    if(message == null && action != m_oActions[m_oActions.length - 1]) {
+                        logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
+                        break;
+                    }
+                    // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
+                    GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
+                    
+                    // Setup the message for processing by the next processor...
+                    message = processingResult;
+                }
+            } catch(Throwable thrown) {
+                processingError(initialMessage, currentProcessor, thrown);
+                logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] threw an exception.", thrown);
+            }
+            
+            processingComplete(initialMessage);
+            
+            // Decrement the active thread count for the listener on completion...
+            decThreads();
+        }
+    }
+}
\ No newline at end of file

Added: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/HttpInvocationListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/HttpInvocationListener.java	2006-10-02 21:24:58 UTC (rev 6534)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/HttpInvocationListener.java	2006-10-02 21:25:46 UTC (rev 6535)
@@ -0,0 +1,218 @@
+package org.jboss.soa.esb.listeners;
+
+import javax.management.MBeanServer;
+
+import org.apache.log4j.Logger;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Http listener implementation using the JBoss Remoting channel.
+ * 
+ * The listener will listen to messages on the configured listenHttpUrl. If this is not set, the default
+ * http://localhost:5400 will be used.
+ * 
+ *  Sample listener Configuration:
+ *  
+ *  <HttpListenerTest listenerClass="org.jboss.soa.esb.listeners.HttpInvocationListener" actions="HttpInvocationToFile"/>
+
+ * <pre>
+ * &lt;HttpListener listenerClass="org.jboss.soa.esb.listeners.HttpInvocationListener" actions="ObjectToFile" listenHttpUrl="http://localhost:8800"&gt; *     
+ * &lt;/HttpListener&gt;
+ * </pre>
+ * 
+ * <pre>
+ * &lt;HttpListener listenerClass="org.jboss.soa.esb.listeners.HttpInvocationListener" actions="ObjectToFile"&gt; *     
+ * &lt;/HttpListener&gt;
+ * </pre>
+ * 
+ * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
+ *
+ */
+public class HttpInvocationListener extends AbstractListener implements
+		ServerInvocationHandler {
+
+	/* The logger for this class */
+	protected Logger logger = Logger.getLogger(HttpInvocationListener.class);
+
+	/* Boolean indicating whether the info logging level is enabled */
+	protected boolean info = this.logger.isInfoEnabled();
+
+	/* The url this listener will listen on */
+	private static final String LISTEN_HTTP_URL = "listenHttpUrl";
+
+	/* The url to listen on */
+	public String listenHttpUrl = null;
+
+	/* The default transport this listener will listen on */
+	private static final String transport = "http";
+
+	/* The default hostname this listener will listen on */
+	private static final String host = "localhost";
+
+	/* The default port this listener will listen on */
+	private static final int port = 5400;
+
+	/**
+	 * Constructor initialising this HttpListener
+	 * 
+	 * @param commandListener
+	 * @param listenerConfig
+	 * @param actionDefinitionFactory
+	 * @throws Exception
+	 */
+	public HttpInvocationListener(GpListener commandListener,
+			DomElement listenerConfig,
+			ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+		super(commandListener, listenerConfig, actionDefinitionFactory);
+		System.out.println("HttpListener constructor called!");
+		this.checkParams();
+		this.initServer();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.jboss.remoting.ServerInvocationHandler#invoke(org.jboss.remoting.InvocationRequest)
+	 */
+	public Object invoke(InvocationRequest invocationRequest) throws Throwable {
+		//Retrieving the real payload of this invocationRequest
+		Object payload = invocationRequest.getRequestPayload();
+		
+		//Eventually delay the action pipeline execution if no thread is available
+		ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
+		if (this.m_iQthr <= this.m_iMaxThr){
+			//We got enough spare threads to start the processing pipeling			
+			//Spawning a thread to execute the pipeline
+			System.out.println("Enough thread --> spwaning a new one to execute pipeline");
+			new Thread(pipelineRunner).start();
+		} else {
+			//We have to wait for a spare listener thread to start the processing pipeline
+			//TODO use a threadpool!
+			new Thread(new ActionProcessingPipelineController(pipelineRunner)).start();
+		}
+		//TODO ?????
+		return null;
+	}
+
+	/**
+	 * Adds a callback handler that will listen for callbacks from the server
+	 * invoker handler.
+	 * 
+	 * @param callbackHandler
+	 */
+	public void addListener(InvokerCallbackHandler callbackHandler) {
+		// NO OP as do not handling callback listeners in this example
+	}
+
+	/**
+	 * Removes the callback handler that was listening for callbacks from the
+	 * server invoker handler.
+	 * 
+	 * @param callbackHandler
+	 */
+	public void removeListener(InvokerCallbackHandler callbackHandler) {
+		// NO OP as do not handling callback listeners in this example
+	}
+
+	/**
+	 * set the mbean server that the handler can reference
+	 * 
+	 * @param server
+	 */
+	public void setMBeanServer(MBeanServer server) {
+		// NO OP as do not need reference to MBeanServer for this handler
+	}
+
+	/**
+	 * set the invoker that owns this handler
+	 * 
+	 * @param invoker
+	 */
+	public void setInvoker(ServerInvoker invoker) {
+		// NO OP as do not need reference back to the server invoker
+	}	
+
+	/**
+	 * Method getting the url this HttpListener instance is listening on
+	 * @return the current listenHttpUrl
+	 */
+	public String getListenHttpUrl() {
+		return listenHttpUrl;
+	}
+
+	/**
+	 * Method setting the listenHttpUrl property to listen on
+	 * @param listenHttpUrl the listenHttpUrl to be used by this HttpListener instance
+	 */
+	public void setListenHttpUrl(String listenHttpUrl) {
+		this.listenHttpUrl = listenHttpUrl;
+	}
+
+	@Override
+	protected void processingError(Object initialMessage,
+			ActionProcessor processor, Throwable error) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	protected void processingComplete(Object initialMessage) {
+		// TODO Auto-generated method stub
+
+	}
+
+	/**
+	 * Check for mandatory and optional attributes in parameter tree
+	 */
+	private void checkParams() throws Exception {
+		// listener url
+		this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig,
+				LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
+	}
+
+	/**
+	 * Method returning the default listenHttpUrl for this HttpListener instance
+	 * 
+	 * @return the default listen url
+	 */
+	private String getDefaultListenHttpUrl() {
+		return HttpInvocationListener.transport + "://"
+				+ HttpInvocationListener.host + ":"
+				+ HttpInvocationListener.port;
+	}
+
+	/**
+	 * Method initialising the remoting deamon
+	 * 
+	 * @throws Exception
+	 *             when something goes wrong during remoting deamon startup
+	 */
+	private void initServer() throws Exception {
+		InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
+		if (this.logger.isInfoEnabled()) {
+			this.logger.info("Starting remoting server with locator uri of: "
+					+ this.listenHttpUrl);
+		}
+		Connector connector = new Connector(locator);
+		connector.create();
+
+		connector.addInvocationHandler("HttpInvocationHandler", this);
+
+		// Starting the server deamon
+		connector.start();
+
+		if (this.logger.isInfoEnabled()) {
+			this.logger.info("HttpListener deamon started successfully!");
+		}
+
+	}
+
+}




More information about the jboss-svn-commits mailing list