[jboss-svn-commits] JBL Code SVN: r6533 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Oct 2 17:24:22 EDT 2006


Author: jokum
Date: 2006-10-02 17:24:19 -0400 (Mon, 02 Oct 2006)
New Revision: 6533

Added:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java
Removed:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
Modified:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
Log:
AbstractListener refactored to AbstractActiveListener

Copied: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java (from rev 6509, labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java)
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-01 10:55:48 UTC (rev 6509)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java	2006-10-02 21:24:19 UTC (rev 6533)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Base abstract message listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractActiveListener extends AbstractListener{     
+
+    protected AbstractActiveListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+    	super (p_oDad, p_oParms, actionDefinitionFactory);      
+    } // __________________________________
+
+    /**
+     * Implement run method for this Runnable <p/> Will continue to run until
+     * controlling class (ref in m_oDad) indicates no more looping allowed for
+     * all child classes <p/> This condition will not prevent child processes to
+     * finish normally
+     */
+    public void run() {
+        while (m_oDad.continueLooping()) {
+            Object[] processList = receive();
+            
+            for (Object message : processList) {
+                if (m_iQthr >= m_iMaxThr) {
+                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+                    try {
+                        Thread.sleep(m_iSleepForThreads);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                    break;
+                }
+
+                // Spawn a thread and push the message message through the pipeline...
+                ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
+                new Thread(runner).start();
+            }
+        }
+        
+        close();
+    }
+    
+    /**
+     * Receive message from underlying channel implementation.
+     * <p/>
+     * Implementations must perform a blocking receive.
+     * @return An array of Objects received on the channel.
+     */
+    protected abstract Object[] receive();
+
+    /**
+     * Called on the listener implementation when pipeline processing error has occured.
+     * @param initialMessage The message reference that was initialy supplied to the pipeline.
+     * @param processor The processor raised the error.
+     * @param error The error.
+     */
+    protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
+
+    /**
+     * Called on the listener implementation when pipeline processing of a message is complete.
+     * @param initialMessage The message reference that was initialy supplied to the pipeline.
+     */
+    protected abstract void processingComplete(Object initialMessage);
+    
+    /**
+     * Close the listener implemenation.
+     * <p/>
+     * Allows the listener to perform relevant close/cleanup tasks.
+     */
+    protected abstract void close();    
+    
+} // ____________________________________________________________________________

Deleted: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-02 21:19:47 UTC (rev 6532)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-02 21:24:19 UTC (rev 6533)
@@ -1,230 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.soa.esb.listeners;
-
-import java.util.*;
-
-import org.apache.log4j.*;
-
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinition;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
-import org.jboss.soa.esb.helpers.*;
-
-/**
- * Base abstract message listener implementation.
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
-public abstract class AbstractListener implements Runnable {   
-    
-    // You can override these values at constructor time of your
-    // derived class after calling super(GpListener,DomElement)
-    protected int m_iSleepForThreads = 3000; // default sleep if no threads available
-    protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
-
-    protected int m_iQthr = 0, m_iMaxThr;
-
-    protected ThreadGroup m_oThrGrp = null;
-
-    protected Logger logger;
-
-    protected GpListener m_oDad;
-
-    protected DomElement listenerConfig;
-
-    protected String[] m_oActions;
-
-    protected ActionDefinitionFactory m_oActionDefinitionFactory;
-
-    protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-        
-        logger = Logger.getLogger(this.getClass());
-        m_oDad = p_oDad;
-        listenerConfig = p_oParms.cloneObj();
-        m_oActionDefinitionFactory = actionDefinitionFactory;
-        m_oThrGrp = new ThreadGroup(listenerConfig.getName());
-
-        String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
-        m_oActions = sAtt.split(",");
-        
-        if(m_oActions.length == 0) {
-            throw new ConfigurationException("Listener 'actions' list must be specified.");
-        }
-
-        sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
-        int iMax = Integer.parseInt(sAtt);
-        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-    } // __________________________________
-
-    /**
-     * Implement run method for this Runnable <p/> Will continue to run until
-     * controlling class (ref in m_oDad) indicates no more looping allowed for
-     * all child classes <p/> This condition will not prevent child processes to
-     * finish normally
-     */
-    public void run() {
-        while (m_oDad.continueLooping()) {
-            Object[] processList = receive();
-            
-            for (Object message : processList) {
-                if (m_iQthr >= m_iMaxThr) {
-                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
-                    try {
-                        Thread.sleep(m_iSleepForThreads);
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                    break;
-                }
-
-                // Spawn a thread and push the message message through the pipeline...
-                ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
-                new Thread(runner).start();
-            }
-        }
-        
-        close();
-    }
-    
-    /**
-     * Receive message from underlying channel implementation.
-     * <p/>
-     * Implementations must perform a blocking receive.
-     * @return An array of Objects received on the channel.
-     */
-    protected abstract Object[] receive();
-
-    /**
-     * Called on the listener implementation when pipeline processing error has occured.
-     * @param initialMessage The message reference that was initialy supplied to the pipeline.
-     * @param processor The processor raised the error.
-     * @param error The error.
-     */
-    protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
-
-    /**
-     * Called on the listener implementation when pipeline processing of a message is complete.
-     * @param initialMessage The message reference that was initialy supplied to the pipeline.
-     */
-    protected abstract void processingComplete(Object initialMessage);
-    
-    /**
-     * Close the listener implemenation.
-     * <p/>
-     * Allows the listener to perform relevant close/cleanup tasks.
-     */
-    protected abstract void close();
-
-    /**
-     * Increment the active thread count.
-     */
-    private void incThreads() {
-        m_iQthr++;
-    }
-
-    /**
-     * Decrement the active thread count.
-     */
-    private void decThreads() {
-        m_iQthr--;
-    }
-    
-    /**
-     * Action Processing Pipeline.
-     * <p/>
-     * Runs the actions in a listeners "actions" config on a message payload message received
-     * by the listener implementation.
-     * 
-     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
-     * @since Version 4.0
-     */
-    private class ActionProcessingPipeline implements Runnable {
-        
-        private Object initialMessage;
-             
-        /**
-         * Private constructor.
-         * @param initialMessage The inital processing target message.
-         */
-        private ActionProcessingPipeline(Object initialMessage) {
-            this.initialMessage = initialMessage;
-        }
-
-        /* (non-Javadoc)
-         * @see java.lang.Runnable#run()
-         */
-        public void run() {
-            String currentAction = null;
-            ActionProcessor currentProcessor = null;
-            
-            // Increment the active thread count for the listener on starting...
-            incThreads();
-            
-            try {
-                Object message = initialMessage;
-
-                // Run the message through each ActionProcessor...
-                for(String action : m_oActions) {
-                    ActionDefinition actionDefinition;
-                    Object processingResult = null;
-
-                    currentAction = action.trim();
-                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
-                    if(actionDefinition == null) {
-                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
-                    }
-                    
-                    // The processing result of each action feeds into the processing of the next action...
-                    currentProcessor = actionDefinition.getProcessor();
-                    try {
-                        processingResult = currentProcessor.process(message);
-                    } catch (Exception e) {
-                        GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
-                        throw e;
-                    }
-                    
-                    if(message == null && action != m_oActions[m_oActions.length - 1]) {
-                        logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
-                        break;
-                    }
-                    // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
-                    GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
-                    
-                    // Setup the message for processing by the next processor...
-                    message = processingResult;
-                }
-            } catch(Throwable thrown) {
-                processingError(initialMessage, currentProcessor, thrown);
-                logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            }
-            
-            processingComplete(initialMessage);
-            
-            // Decrement the active thread count for the listener on completion...
-            decThreads();
-        }
-    }
-    
-} // ____________________________________________________________________________

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-10-02 21:19:47 UTC (rev 6532)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-10-02 21:24:19 UTC (rev 6533)
@@ -36,7 +36,7 @@
  * 
  * @author Esteban
  */
-public abstract class AbstractPoller extends AbstractListener {
+public abstract class AbstractPoller extends AbstractActiveListener {
 
     // You can override these values at constructor time of your
     // derived class after calling super(GpListener,DomElement)

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-10-02 21:19:47 UTC (rev 6532)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-10-02 21:24:19 UTC (rev 6533)
@@ -37,7 +37,6 @@
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
 import org.jboss.soa.esb.command.CommandQueue;
 import org.jboss.soa.esb.command.CommandQueueException;
-import org.jboss.soa.esb.command.JmsCommandQueue;
 import org.jboss.soa.esb.common.Configuration;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;




More information about the jboss-svn-commits mailing list