[jboss-svn-commits] JBL Code SVN: r6533 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Oct 2 17:24:22 EDT 2006
Author: jokum
Date: 2006-10-02 17:24:19 -0400 (Mon, 02 Oct 2006)
New Revision: 6533
Added:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java
Removed:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
Modified:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
Log:
AbstractListener refactored to AbstractActiveListener
Copied: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java (from rev 6509, labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java)
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-01 10:55:48 UTC (rev 6509)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java 2006-10-02 21:24:19 UTC (rev 6533)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Base abstract message listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractActiveListener extends AbstractListener{
+
+ protected AbstractActiveListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super (p_oDad, p_oParms, actionDefinitionFactory);
+ } // __________________________________
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+ public void run() {
+ while (m_oDad.continueLooping()) {
+ Object[] processList = receive();
+
+ for (Object message : processList) {
+ if (m_iQthr >= m_iMaxThr) {
+ logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+
+ // Spawn a thread and push the message message through the pipeline...
+ ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
+ new Thread(runner).start();
+ }
+ }
+
+ close();
+ }
+
+ /**
+ * Receive message from underlying channel implementation.
+ * <p/>
+ * Implementations must perform a blocking receive.
+ * @return An array of Objects received on the channel.
+ */
+ protected abstract Object[] receive();
+
+ /**
+ * Called on the listener implementation when pipeline processing error has occured.
+ * @param initialMessage The message reference that was initialy supplied to the pipeline.
+ * @param processor The processor raised the error.
+ * @param error The error.
+ */
+ protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
+
+ /**
+ * Called on the listener implementation when pipeline processing of a message is complete.
+ * @param initialMessage The message reference that was initialy supplied to the pipeline.
+ */
+ protected abstract void processingComplete(Object initialMessage);
+
+ /**
+ * Close the listener implemenation.
+ * <p/>
+ * Allows the listener to perform relevant close/cleanup tasks.
+ */
+ protected abstract void close();
+
+} // ____________________________________________________________________________
Deleted: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-02 21:19:47 UTC (rev 6532)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-02 21:24:19 UTC (rev 6533)
@@ -1,230 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.soa.esb.listeners;
-
-import java.util.*;
-
-import org.apache.log4j.*;
-
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinition;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
-import org.jboss.soa.esb.helpers.*;
-
-/**
- * Base abstract message listener implementation.
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
-public abstract class AbstractListener implements Runnable {
-
- // You can override these values at constructor time of your
- // derived class after calling super(GpListener,DomElement)
- protected int m_iSleepForThreads = 3000; // default sleep if no threads available
- protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
-
- protected int m_iQthr = 0, m_iMaxThr;
-
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger logger;
-
- protected GpListener m_oDad;
-
- protected DomElement listenerConfig;
-
- protected String[] m_oActions;
-
- protected ActionDefinitionFactory m_oActionDefinitionFactory;
-
- protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-
- logger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- listenerConfig = p_oParms.cloneObj();
- m_oActionDefinitionFactory = actionDefinitionFactory;
- m_oThrGrp = new ThreadGroup(listenerConfig.getName());
-
- String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
- m_oActions = sAtt.split(",");
-
- if(m_oActions.length == 0) {
- throw new ConfigurationException("Listener 'actions' list must be specified.");
- }
-
- sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
- } // __________________________________
-
- /**
- * Implement run method for this Runnable <p/> Will continue to run until
- * controlling class (ref in m_oDad) indicates no more looping allowed for
- * all child classes <p/> This condition will not prevent child processes to
- * finish normally
- */
- public void run() {
- while (m_oDad.continueLooping()) {
- Object[] processList = receive();
-
- for (Object message : processList) {
- if (m_iQthr >= m_iMaxThr) {
- logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
- try {
- Thread.sleep(m_iSleepForThreads);
- } catch (InterruptedException e) {
- return;
- }
- break;
- }
-
- // Spawn a thread and push the message message through the pipeline...
- ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
- new Thread(runner).start();
- }
- }
-
- close();
- }
-
- /**
- * Receive message from underlying channel implementation.
- * <p/>
- * Implementations must perform a blocking receive.
- * @return An array of Objects received on the channel.
- */
- protected abstract Object[] receive();
-
- /**
- * Called on the listener implementation when pipeline processing error has occured.
- * @param initialMessage The message reference that was initialy supplied to the pipeline.
- * @param processor The processor raised the error.
- * @param error The error.
- */
- protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
-
- /**
- * Called on the listener implementation when pipeline processing of a message is complete.
- * @param initialMessage The message reference that was initialy supplied to the pipeline.
- */
- protected abstract void processingComplete(Object initialMessage);
-
- /**
- * Close the listener implemenation.
- * <p/>
- * Allows the listener to perform relevant close/cleanup tasks.
- */
- protected abstract void close();
-
- /**
- * Increment the active thread count.
- */
- private void incThreads() {
- m_iQthr++;
- }
-
- /**
- * Decrement the active thread count.
- */
- private void decThreads() {
- m_iQthr--;
- }
-
- /**
- * Action Processing Pipeline.
- * <p/>
- * Runs the actions in a listeners "actions" config on a message payload message received
- * by the listener implementation.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
- private class ActionProcessingPipeline implements Runnable {
-
- private Object initialMessage;
-
- /**
- * Private constructor.
- * @param initialMessage The inital processing target message.
- */
- private ActionProcessingPipeline(Object initialMessage) {
- this.initialMessage = initialMessage;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
- String currentAction = null;
- ActionProcessor currentProcessor = null;
-
- // Increment the active thread count for the listener on starting...
- incThreads();
-
- try {
- Object message = initialMessage;
-
- // Run the message through each ActionProcessor...
- for(String action : m_oActions) {
- ActionDefinition actionDefinition;
- Object processingResult = null;
-
- currentAction = action.trim();
- actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
- if(actionDefinition == null) {
- throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
- }
-
- // The processing result of each action feeds into the processing of the next action...
- currentProcessor = actionDefinition.getProcessor();
- try {
- processingResult = currentProcessor.process(message);
- } catch (Exception e) {
- GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
- throw e;
- }
-
- if(message == null && action != m_oActions[m_oActions.length - 1]) {
- logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
- break;
- }
- // Notify on all processors. May want to do this differently in the future i.e. more selectively ...
- GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
-
- // Setup the message for processing by the next processor...
- message = processingResult;
- }
- } catch(Throwable thrown) {
- processingError(initialMessage, currentProcessor, thrown);
- logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] threw an exception.", thrown);
- }
-
- processingComplete(initialMessage);
-
- // Decrement the active thread count for the listener on completion...
- decThreads();
- }
- }
-
-} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-10-02 21:19:47 UTC (rev 6532)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-10-02 21:24:19 UTC (rev 6533)
@@ -36,7 +36,7 @@
*
* @author Esteban
*/
-public abstract class AbstractPoller extends AbstractListener {
+public abstract class AbstractPoller extends AbstractActiveListener {
// You can override these values at constructor time of your
// derived class after calling super(GpListener,DomElement)
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-10-02 21:19:47 UTC (rev 6532)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-10-02 21:24:19 UTC (rev 6533)
@@ -37,7 +37,6 @@
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
import org.jboss.soa.esb.command.CommandQueue;
import org.jboss.soa.esb.command.CommandQueueException;
-import org.jboss.soa.esb.command.JmsCommandQueue;
import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
More information about the jboss-svn-commits
mailing list