[jboss-svn-commits] JBL Code SVN: r7346 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 2 16:19:56 EST 2006


Author: jokum
Date: 2006-11-02 16:19:54 -0500 (Thu, 02 Nov 2006)
New Revision: 7346

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Log:
HttpListener added to old architecture

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 20:46:57 UTC (rev 7345)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 21:19:54 UTC (rev 7346)
@@ -0,0 +1,295 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.old;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+/**
+ * Base abstract listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractListener implements Runnable {   
+
+    /**
+	 * Name constant def for the Message attachemnt carrying the list of actions to be applied to the
+	 * incomming message.  This allows the configured processing pipeline to be overridden by the Message
+	 * producer. 
+	 */
+	public static final String MESSAGE_PROCESSING_ACTIONS_LIST = "MESSAGE_PROCESSING_ACTIONS_LIST";
+	
+    // You can override these values at constructor time of your
+    // derived class after calling super(GpListener,ConfigTree)
+    protected int m_iSleepForThreads = 3000; // default sleep if no threads available
+    protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+
+    protected int 			m_iQthr = 0, m_iMaxThr;
+
+    protected ThreadGroup 	m_oThrGrp = null;
+    protected Logger 		logger;
+    protected GpListener 	m_oDad;
+    protected ConfigTree 	listenerConfig;
+    protected String[] 		m_oActions;
+    protected ActionDefinitionFactory m_oActionDefinitionFactory;
+    protected MessageFactory m_oMsgFactory;
+
+    protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        
+        logger 		= Logger.getLogger(this.getClass());
+        m_oDad 		= p_oDad;
+        listenerConfig = p_oParms.cloneObj();
+        m_oActionDefinitionFactory = actionDefinitionFactory;
+        m_oMsgFactory = MessageFactory.getInstance();
+        m_oThrGrp 	= new ThreadGroup(listenerConfig.getName());
+
+        String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
+        if(!sAtt.trim().equals("")) {
+        	m_oActions 	= sAtt.split(",");
+        }
+        
+        sAtt		= GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+        int iMax 	= Integer.parseInt(sAtt);
+        m_iMaxThr 	= Math.min(iMax, m_iUpperThreadLimit);
+    } // __________________________________
+
+    /**
+     * Implement run method for this Runnable <p/> Will continue to run until
+     * controlling class (ref in m_oDad) indicates no more looping allowed for
+     * all child classes <p/> This condition will not prevent child processes to
+     * finish normally
+     */
+    public void run() {
+        while (m_oDad.continueLooping()) {
+            Object[] processList = receive();
+            if (null==processList) {
+            	try { Thread.sleep(500); }
+            	catch(InterruptedException e) {/*  ok  do nothing  */}
+            } else {
+              for (Object currentObj : processList) {
+                if (m_iQthr >= m_iMaxThr) {
+                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+                    try {
+                        Thread.sleep(m_iSleepForThreads);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                    break;
+                }
+
+                // Spawn a thread and push the message message through the pipeline...
+                ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
+                new Thread(runner).start();
+                incThreads();
+              }
+            }
+        }
+        
+        // Wait for all the processing pipelines to complete before closing the listener and existing...
+        while(m_iQthr > 0) {
+        	logger.info("Waiting for all processing pipelines to complete.");
+        	try {
+				Thread.sleep(200);
+			} catch (InterruptedException e) {
+	        	logger.warn("Thread interrupted while waiting for all processing pipelines to complete.", e);
+			}
+        }
+
+    	logger.info("All processing pipelines complete. Closing listener now.");
+        
+        close();
+    }
+    
+    /**
+     * Receive message from underlying channel implementation.
+     * <p/>
+     * Implementations must perform a blocking receive.
+     * @return An array of Objects received on the channel.
+     */
+    protected abstract Object[] receive();
+
+    /**
+     * Called on the listener implementation when pipeline processing error has occured.
+     * @param initialMsg The message that was initialy supplied to the pipeline.
+     * @param processor The processor that raised the error.  Can be null where the error was raised before
+     * pipeline processing of the message.
+     * @param error The error.  Can be null.
+     */
+    protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
+
+    /**
+     * Called on the listener implementation when pipeline processing of a message is complete.
+     * @param initialMsg The message that was initialy supplied to the pipeline.
+     */
+    protected abstract void processingComplete(Object initialMsg);
+    
+    /**
+     * Close the listener implemenation.
+     * <p/>
+     * Allows the listener to perform relevant close/cleanup tasks.
+     */
+    protected abstract void close();
+    
+    /**
+     * Increment the active thread count.
+     */
+    private void incThreads() {
+        m_iQthr++;
+    }
+
+    /**
+     * Decrement the active thread count.
+     */
+    private void decThreads() {
+        m_iQthr--;
+    }
+    
+    /**
+     * Action Processing Pipeline.
+     * <p/>
+     * Runs the actions in a listeners "actions" config on a message payload message received
+     * by the listener implementation.
+     * 
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     * @since Version 4.0
+     */
+    protected class ActionProcessingPipeline implements Runnable {
+        
+		private Object initialObject;
+             
+        /**
+         * Private constructor.
+         * @param pMessage The inital processing target message.
+         */
+        protected ActionProcessingPipeline(Object obj) {
+            initialObject = obj;
+        }
+
+        /* (non-Javadoc)
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            String currentAction = null;
+            ActionProcessor currentProcessor = null;
+            
+            try {
+                Message message;
+                String[] actions;
+                
+                if(initialObject instanceof Message) {
+                	message = (Message)initialObject;
+                } else {
+	                message = m_oMsgFactory.getMessage();
+	                ActionUtils.setTaskObject(message,initialObject);
+                }
+                
+                actions = getActions(message);
+                
+                // Run the message through each ActionProcessor...
+                for(String action : actions) {
+                    ActionDefinition actionDefinition;
+
+                    currentAction = action.trim();
+                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+                    if(actionDefinition == null) {
+                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
+                    }
+                    
+                    // The processing result of each action feeds into the processing of the next action...
+                    currentProcessor = actionDefinition.getProcessor();
+                    try {
+                    	ActionUtils.copyCurrentToPrevious(message);
+                        message = currentProcessor.process(message);
+                    } catch (Exception e) {
+                        GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
+                        throw e;
+                    }
+                    
+                    if(message == null && action != m_oActions[m_oActions.length - 1]) {
+                    	String exceptionMessage = "Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].";
+                        processingError(initialObject, currentProcessor, new ActionProcessingException(exceptionMessage));
+                        logger.warn(exceptionMessage);
+                        return;
+                    }
+                    // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
+                    GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
+                    
+                    // Setup the message for processing by the next processor...
+                    if(message != null) {
+                    	message.getBody().remove(ActionUtils.BEFORE_ACTION);
+                    }
+                }
+                
+                processingComplete(initialObject);
+            } catch(Throwable thrown) {
+                processingError(initialObject, currentProcessor, thrown);
+                logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "].  Action [" + currentAction + "] threw an exception.", thrown);
+            } finally {
+	            // Decrement the active thread count for the listener on completion...
+	            decThreads();
+            }
+        }
+
+		/**
+		 * Get the list of actions to be applied to the supplied message.
+		 * @param message The message to be processed.
+		 * @return The set of processing actions to be performed on the message. 
+		 * @throws ActionProcessingException Invalid actions list attachment setting.
+		 */
+		private String[] getActions(Message message) throws ActionProcessingException {
+			// Check is there an attachment specifying an override pipeline config...
+			Object overrideActionsAttachment = message.getAttachment().get(MESSAGE_PROCESSING_ACTIONS_LIST);
+			if(overrideActionsAttachment != null) {
+				if(overrideActionsAttachment instanceof String) {
+					String overrideActions = (String)overrideActionsAttachment;
+					
+					if(overrideActions.trim().equals("")) {
+			        	throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] was specified but with an empty value.  Aborting message processing.");
+					}
+					
+					return overrideActions.split(",");
+				} else {
+					throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] must be of type java.lang.String.  Received [" + overrideActionsAttachment.getClass().getName() + "].  Aborting message processing.");
+				}
+			} else {
+				// Otherwise use the actions configured on the listener...
+				if(m_oActions == null || m_oActions.length == 0) {
+					throw new ActionProcessingException("No actions configuration specified either on the listener or as a Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "].  Aborting message processing.");
+				}
+				return m_oActions;
+			}
+		}
+    }
+    
+}

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 20:46:57 UTC (rev 7345)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 21:19:54 UTC (rev 7346)
@@ -0,0 +1,143 @@
+package org.jboss.soa.esb.listeners.old;
+
+import javax.management.MBeanServer;
+
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+/**
+ * 
+ * 
+ * @author Johan Kumps
+ * 
+ */
+public class HttpListener extends AbstractPassiveListener implements ServerInvocationHandler{
+
+	/* The url to listen on */
+	public String listenHttpUrl = null;
+
+	/* The url this listener will listen on */
+	private static final String LISTEN_HTTP_URL = "listenHttpURL";
+
+	/* The default transport this listener will listen on */
+	private static final String transport = "http";
+
+	/* The default hostname this listener will listen on */
+	private static final String host = "localhost";
+
+	/* The default port this listener will listen on */
+	private static final int port = 5400;
+
+	public HttpListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
+	  {
+		super(p_oDad, p_oParms, actionDefinitionFactory);
+		this.listenHttpUrl = GpListener.obtainAtt(p_oParms,LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
+		// initialize the HTTP server
+		this.initServer();
+	  }
+
+	/**
+	 * Check for mandatory and optional attributes in parameter tree
+	 * 
+	 * @throws Exception -
+	 *             if actionClass not specified or not in classpath or invalid
+	 *             int values for maxThreads or pollLatencySecs
+	 * 
+	 */
+	protected void checkParams() throws Exception {
+		// listener url
+		this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig, LISTEN_HTTP_URL,
+				this.getDefaultListenHttpUrl());
+	}
+	
+	public Object invoke(InvocationRequest invocationRequest) throws Throwable {
+		//Retrieving the real payload of this invocationRequest
+		Object payload = invocationRequest.getParameter();
+
+		if (this.logger.isInfoEnabled()) {
+			this.logger
+					.info("HttpInvocationListener is invoked...The given payload is : "
+							+ payload);
+		}
+		//Start the action processing pipeline
+		ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
+		new Thread(pipelineRunner).start();
+		
+		return payload;
+	}
+
+	/**
+	 * Method returning the default listenHttpUrl for this HttpListener instance
+	 * 
+	 * @return the default listen url
+	 */
+	private String getDefaultListenHttpUrl() {
+		return HttpListener.transport + "://" + HttpListener.host + ":"
+				+ HttpListener.port;
+	}
+
+	private void initServer() throws Exception {
+		InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
+		if (this.logger.isInfoEnabled()) {
+			this.logger.info("Starting remoting server with locator uri of: "
+					+ this.listenHttpUrl);
+		}
+		Connector connector = new Connector(locator);
+		connector.create();
+		connector.addInvocationHandler("HttpInvocationHandler", this);
+
+		// Starting the server deamon
+		connector.start();
+
+		if (this.logger.isInfoEnabled()) {
+			this.logger.info("HttpListener deamon started successfully!");
+		}		
+
+	}
+
+	@Override
+	protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void processingComplete(Object initialMsg) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	protected void close() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void setMBeanServer(MBeanServer arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void setInvoker(ServerInvoker arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void addListener(InvokerCallbackHandler arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void removeListener(InvokerCallbackHandler arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+}




More information about the jboss-svn-commits mailing list