[jboss-svn-commits] JBL Code SVN: r7346 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 2 16:19:56 EST 2006
Author: jokum
Date: 2006-11-02 16:19:54 -0500 (Thu, 02 Nov 2006)
New Revision: 7346
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Log:
HttpListener added to old architecture
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 20:46:57 UTC (rev 7345)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 21:19:54 UTC (rev 7346)
@@ -0,0 +1,295 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.old;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+/**
+ * Base abstract listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractListener implements Runnable {
+
+ /**
+ * Name constant def for the Message attachemnt carrying the list of actions to be applied to the
+ * incomming message. This allows the configured processing pipeline to be overridden by the Message
+ * producer.
+ */
+ public static final String MESSAGE_PROCESSING_ACTIONS_LIST = "MESSAGE_PROCESSING_ACTIONS_LIST";
+
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,ConfigTree)
+ protected int m_iSleepForThreads = 3000; // default sleep if no threads available
+ protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+
+ protected int m_iQthr = 0, m_iMaxThr;
+
+ protected ThreadGroup m_oThrGrp = null;
+ protected Logger logger;
+ protected GpListener m_oDad;
+ protected ConfigTree listenerConfig;
+ protected String[] m_oActions;
+ protected ActionDefinitionFactory m_oActionDefinitionFactory;
+ protected MessageFactory m_oMsgFactory;
+
+ protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+
+ logger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ listenerConfig = p_oParms.cloneObj();
+ m_oActionDefinitionFactory = actionDefinitionFactory;
+ m_oMsgFactory = MessageFactory.getInstance();
+ m_oThrGrp = new ThreadGroup(listenerConfig.getName());
+
+ String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
+ if(!sAtt.trim().equals("")) {
+ m_oActions = sAtt.split(",");
+ }
+
+ sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ } // __________________________________
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+ public void run() {
+ while (m_oDad.continueLooping()) {
+ Object[] processList = receive();
+ if (null==processList) {
+ try { Thread.sleep(500); }
+ catch(InterruptedException e) {/* ok do nothing */}
+ } else {
+ for (Object currentObj : processList) {
+ if (m_iQthr >= m_iMaxThr) {
+ logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+
+ // Spawn a thread and push the message message through the pipeline...
+ ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
+ new Thread(runner).start();
+ incThreads();
+ }
+ }
+ }
+
+ // Wait for all the processing pipelines to complete before closing the listener and existing...
+ while(m_iQthr > 0) {
+ logger.info("Waiting for all processing pipelines to complete.");
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.warn("Thread interrupted while waiting for all processing pipelines to complete.", e);
+ }
+ }
+
+ logger.info("All processing pipelines complete. Closing listener now.");
+
+ close();
+ }
+
+ /**
+ * Receive message from underlying channel implementation.
+ * <p/>
+ * Implementations must perform a blocking receive.
+ * @return An array of Objects received on the channel.
+ */
+ protected abstract Object[] receive();
+
+ /**
+ * Called on the listener implementation when pipeline processing error has occured.
+ * @param initialMsg The message that was initialy supplied to the pipeline.
+ * @param processor The processor that raised the error. Can be null where the error was raised before
+ * pipeline processing of the message.
+ * @param error The error. Can be null.
+ */
+ protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
+
+ /**
+ * Called on the listener implementation when pipeline processing of a message is complete.
+ * @param initialMsg The message that was initialy supplied to the pipeline.
+ */
+ protected abstract void processingComplete(Object initialMsg);
+
+ /**
+ * Close the listener implemenation.
+ * <p/>
+ * Allows the listener to perform relevant close/cleanup tasks.
+ */
+ protected abstract void close();
+
+ /**
+ * Increment the active thread count.
+ */
+ private void incThreads() {
+ m_iQthr++;
+ }
+
+ /**
+ * Decrement the active thread count.
+ */
+ private void decThreads() {
+ m_iQthr--;
+ }
+
+ /**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs the actions in a listeners "actions" config on a message payload message received
+ * by the listener implementation.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ protected class ActionProcessingPipeline implements Runnable {
+
+ private Object initialObject;
+
+ /**
+ * Private constructor.
+ * @param pMessage The inital processing target message.
+ */
+ protected ActionProcessingPipeline(Object obj) {
+ initialObject = obj;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ String currentAction = null;
+ ActionProcessor currentProcessor = null;
+
+ try {
+ Message message;
+ String[] actions;
+
+ if(initialObject instanceof Message) {
+ message = (Message)initialObject;
+ } else {
+ message = m_oMsgFactory.getMessage();
+ ActionUtils.setTaskObject(message,initialObject);
+ }
+
+ actions = getActions(message);
+
+ // Run the message through each ActionProcessor...
+ for(String action : actions) {
+ ActionDefinition actionDefinition;
+
+ currentAction = action.trim();
+ actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+ if(actionDefinition == null) {
+ throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
+ }
+
+ // The processing result of each action feeds into the processing of the next action...
+ currentProcessor = actionDefinition.getProcessor();
+ try {
+ ActionUtils.copyCurrentToPrevious(message);
+ message = currentProcessor.process(message);
+ } catch (Exception e) {
+ GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
+ throw e;
+ }
+
+ if(message == null && action != m_oActions[m_oActions.length - 1]) {
+ String exceptionMessage = "Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].";
+ processingError(initialObject, currentProcessor, new ActionProcessingException(exceptionMessage));
+ logger.warn(exceptionMessage);
+ return;
+ }
+ // Notify on all processors. May want to do this differently in the future i.e. more selectively ...
+ GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
+
+ // Setup the message for processing by the next processor...
+ if(message != null) {
+ message.getBody().remove(ActionUtils.BEFORE_ACTION);
+ }
+ }
+
+ processingComplete(initialObject);
+ } catch(Throwable thrown) {
+ processingError(initialObject, currentProcessor, thrown);
+ logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "]. Action [" + currentAction + "] threw an exception.", thrown);
+ } finally {
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
+ }
+ }
+
+ /**
+ * Get the list of actions to be applied to the supplied message.
+ * @param message The message to be processed.
+ * @return The set of processing actions to be performed on the message.
+ * @throws ActionProcessingException Invalid actions list attachment setting.
+ */
+ private String[] getActions(Message message) throws ActionProcessingException {
+ // Check is there an attachment specifying an override pipeline config...
+ Object overrideActionsAttachment = message.getAttachment().get(MESSAGE_PROCESSING_ACTIONS_LIST);
+ if(overrideActionsAttachment != null) {
+ if(overrideActionsAttachment instanceof String) {
+ String overrideActions = (String)overrideActionsAttachment;
+
+ if(overrideActions.trim().equals("")) {
+ throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] was specified but with an empty value. Aborting message processing.");
+ }
+
+ return overrideActions.split(",");
+ } else {
+ throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] must be of type java.lang.String. Received [" + overrideActionsAttachment.getClass().getName() + "]. Aborting message processing.");
+ }
+ } else {
+ // Otherwise use the actions configured on the listener...
+ if(m_oActions == null || m_oActions.length == 0) {
+ throw new ActionProcessingException("No actions configuration specified either on the listener or as a Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "]. Aborting message processing.");
+ }
+ return m_oActions;
+ }
+ }
+ }
+
+}
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 20:46:57 UTC (rev 7345)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 21:19:54 UTC (rev 7346)
@@ -0,0 +1,143 @@
+package org.jboss.soa.esb.listeners.old;
+
+import javax.management.MBeanServer;
+
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+/**
+ *
+ *
+ * @author Johan Kumps
+ *
+ */
+public class HttpListener extends AbstractPassiveListener implements ServerInvocationHandler{
+
+ /* The url to listen on */
+ public String listenHttpUrl = null;
+
+ /* The url this listener will listen on */
+ private static final String LISTEN_HTTP_URL = "listenHttpURL";
+
+ /* The default transport this listener will listen on */
+ private static final String transport = "http";
+
+ /* The default hostname this listener will listen on */
+ private static final String host = "localhost";
+
+ /* The default port this listener will listen on */
+ private static final int port = 5400;
+
+ public HttpListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
+ {
+ super(p_oDad, p_oParms, actionDefinitionFactory);
+ this.listenHttpUrl = GpListener.obtainAtt(p_oParms,LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
+ // initialize the HTTP server
+ this.initServer();
+ }
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if actionClass not specified or not in classpath or invalid
+ * int values for maxThreads or pollLatencySecs
+ *
+ */
+ protected void checkParams() throws Exception {
+ // listener url
+ this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig, LISTEN_HTTP_URL,
+ this.getDefaultListenHttpUrl());
+ }
+
+ public Object invoke(InvocationRequest invocationRequest) throws Throwable {
+ //Retrieving the real payload of this invocationRequest
+ Object payload = invocationRequest.getParameter();
+
+ if (this.logger.isInfoEnabled()) {
+ this.logger
+ .info("HttpInvocationListener is invoked...The given payload is : "
+ + payload);
+ }
+ //Start the action processing pipeline
+ ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
+ new Thread(pipelineRunner).start();
+
+ return payload;
+ }
+
+ /**
+ * Method returning the default listenHttpUrl for this HttpListener instance
+ *
+ * @return the default listen url
+ */
+ private String getDefaultListenHttpUrl() {
+ return HttpListener.transport + "://" + HttpListener.host + ":"
+ + HttpListener.port;
+ }
+
+ private void initServer() throws Exception {
+ InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
+ if (this.logger.isInfoEnabled()) {
+ this.logger.info("Starting remoting server with locator uri of: "
+ + this.listenHttpUrl);
+ }
+ Connector connector = new Connector(locator);
+ connector.create();
+ connector.addInvocationHandler("HttpInvocationHandler", this);
+
+ // Starting the server deamon
+ connector.start();
+
+ if (this.logger.isInfoEnabled()) {
+ this.logger.info("HttpListener deamon started successfully!");
+ }
+
+ }
+
+ @Override
+ protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void processingComplete(Object initialMsg) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setMBeanServer(MBeanServer arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setInvoker(ServerInvoker arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void addListener(InvokerCallbackHandler arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void removeListener(InvokerCallbackHandler arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
More information about the jboss-svn-commits
mailing list