[jboss-svn-commits] JBL Code SVN: r6681 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Oct 7 17:15:41 EDT 2006
Author: jokum
Date: 2006-10-07 17:15:39 -0400 (Sat, 07 Oct 2006)
New Revision: 6681
Modified:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
Implementation HttpListener
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-07 20:53:52 UTC (rev 6680)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-07 21:15:39 UTC (rev 6681)
@@ -23,6 +23,9 @@
package org.jboss.soa.esb.listeners;
import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
@@ -35,199 +38,194 @@
import org.jboss.soa.esb.message.format.MessageFactory;
/**
- * Base abstract listener implementation.
+ * Base abstract message listener implementation. This is a 'passive' message
+ * listener to be used when the underlying channel implementation implements the
+ * blocking receive itself. For example if the concrete listener class is
+ * implemented using the JBoss Remoting channel implementations (For example
+ * HttpListener).
+ *
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
* @since Version 4.0
*/
-public abstract class AbstractListener implements Runnable {
-
- // You can override these values at constructor time of your
- // derived class after calling super(GpListener,DomElement)
- protected int m_iSleepForThreads = 3000; // default sleep if no threads available
- protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+public abstract class AbstractListener implements Runnable {
- protected int m_iQthr = 0, m_iMaxThr;
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,DomElement)
- protected ThreadGroup m_oThrGrp = null;
- protected Logger logger;
- protected GpListener m_oDad;
- protected DomElement listenerConfig;
- protected String[] m_oActions;
- protected ActionDefinitionFactory m_oActionDefinitionFactory;
- protected MessageFactory m_oMsgFactory;
+ protected int m_iUpperThreadLimit = 10; // just in case - override if you
- protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-
- logger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- listenerConfig = p_oParms.cloneObj();
- m_oActionDefinitionFactory = actionDefinitionFactory;
- m_oMsgFactory = MessageFactory.getInstance();
- m_oThrGrp = new ThreadGroup(listenerConfig.getName());
+ // wish
- String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
- m_oActions = sAtt.split(",");
-
- if(m_oActions.length == 0) {
- throw new ConfigurationException("Listener 'actions' list must be specified.");
- }
+ protected Logger logger;
- sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
- } // __________________________________
+ protected GpListener m_oDad;
- /**
- * Implement run method for this Runnable <p/> Will continue to run until
- * controlling class (ref in m_oDad) indicates no more looping allowed for
- * all child classes <p/> This condition will not prevent child processes to
- * finish normally
- */
- public void run() {
- while (m_oDad.continueLooping()) {
- Object[] processList = receive();
- if (null==processList)
- try { Thread.sleep(500); }
- catch(InterruptedException e) {/* ok do nothing */}
- else
- for (Object currentObj : processList) {
- if (m_iQthr >= m_iMaxThr) {
- logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
- try {
- Thread.sleep(m_iSleepForThreads);
- } catch (InterruptedException e) {
- return;
- }
- break;
- }
+ protected DomElement listenerConfig;
- // Spawn a thread and push the message message through the pipeline...
- ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
- new Thread(runner).start();
- }
- }
-
- close();
- }
-
- /**
- * Receive message from underlying channel implementation.
- * <p/>
- * Implementations must perform a blocking receive.
- * @return An array of Objects received on the channel.
- */
- protected abstract Object[] receive();
+ protected String[] m_oActions;
- /**
- * Called on the listener implementation when pipeline processing error has occured.
- * @param initialMsg The message that was initialy supplied to the pipeline.
- * @param processor The processor raised the error.
- * @param error The error.
- */
- protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
+ protected ActionDefinitionFactory m_oActionDefinitionFactory;
- /**
- * Called on the listener implementation when pipeline processing of a message is complete.
- * @param initialMsg The message that was initialy supplied to the pipeline.
- */
- protected abstract void processingComplete(Object initialMsg);
-
- /**
- * Close the listener implemenation.
- * <p/>
- * Allows the listener to perform relevant close/cleanup tasks.
- */
- protected abstract void close();
-
- /**
- * Increment the active thread count.
- */
- private void incThreads() {
- m_iQthr++;
- }
+ protected MessageFactory m_oMsgFactory;
- /**
- * Decrement the active thread count.
- */
- private void decThreads() {
- m_iQthr--;
- }
-
- /**
- * Action Processing Pipeline.
- * <p/>
- * Runs the actions in a listeners "actions" config on a message payload message received
- * by the listener implementation.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
- private class ActionProcessingPipeline implements Runnable {
-
- private Object initialObject;
-
- /**
- * Private constructor.
- * @param pMessage The inital processing target message.
- */
- private ActionProcessingPipeline(Object obj) {
- initialObject = obj;
- }
+ protected ExecutorService pipelineExecutorPool = null;
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
- String currentAction = null;
- ActionProcessor currentProcessor = null;
-
- // Increment the active thread count for the listener on starting...
- incThreads();
-
- try {
- Message message = m_oMsgFactory.getMessage();
- ActionUtils.putCurrentObject(message,initialObject);
+ protected AbstractListener(GpListener p_oDad, DomElement p_oParms,
+ ActionDefinitionFactory actionDefinitionFactory) throws Exception {
- // Run the message through each ActionProcessor...
- for(String action : m_oActions) {
- ActionDefinition actionDefinition;
+ logger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ listenerConfig = p_oParms.cloneObj();
+ m_oActionDefinitionFactory = actionDefinitionFactory;
+ m_oMsgFactory = MessageFactory.getInstance();
- currentAction = action.trim();
- actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
- if(actionDefinition == null) {
- throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
- }
-
- // The processing result of each action feeds into the processing of the next action...
- currentProcessor = actionDefinition.getProcessor();
- try {
- ActionUtils.copyCurrentToPrevious(message);
- message = currentProcessor.process(message);
- } catch (Exception e) {
- GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
- throw e;
- }
-
- if(message == null && action != m_oActions[m_oActions.length - 1]) {
- logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
- break;
- }
- // Notify on all processors. May want to do this differently in the future i.e. more selectively ...
- GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
-
- // Setup the message for processing by the next processor...
- message.getBody().remove(ActionUtils.BEFORE_ACTION);
- }
- } catch(Throwable thrown) {
- processingError(initialObject, currentProcessor, thrown);
- logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] threw an exception.", thrown);
- }
-
- processingComplete(initialObject);
-
- // Decrement the active thread count for the listener on completion...
- decThreads();
- }
- }
-
+ String sAtt = GpListener.obtainAtt(listenerConfig,
+ GpListener.PARM_ACTIONS, "");
+ m_oActions = sAtt.split(",");
+
+ if (m_oActions.length == 0) {
+ throw new ConfigurationException(
+ "Listener 'actions' list must be specified.");
+ }
+
+ sAtt = GpListener.obtainAtt(listenerConfig,
+ GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ int m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+
+ this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
+ } // __________________________________
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+
+ public void run() {
+ // No need to do something in the passive listeners
+ }
+
+ /**
+ * Called on the listener implementation when pipeline processing error has
+ * occured.
+ *
+ * @param initialObject
+ * The object reference that was initialy supplied to the
+ * pipeline.
+ * @param processor
+ * The processor raised the error.
+ * @param error
+ * The error.
+ */
+ protected abstract void processingError(Object initialObject,
+ ActionProcessor processor, Throwable error);
+
+ /**
+ * Called on the listener implementation when pipeline processing of a
+ * message is complete.
+ *
+ * @param initialObject
+ * The object reference that was initialy supplied to the
+ * pipeline.
+ */
+ protected abstract void processingComplete(Object initialObject);
+
+ /**
+ * Action Processing Pipeline. <p/> Runs the actions in a listeners
+ * "actions" config on a message payload message received by the listener
+ * implementation.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ protected class ActionProcessingPipeline implements Callable {
+
+ private Object initialObject;
+
+ /**
+ * Private constructor.
+ *
+ * @param pMessage
+ * The inital processing target message.
+ */
+ protected ActionProcessingPipeline(Object obj) {
+ initialObject = obj;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public Object call() {
+ String currentAction = null;
+ ActionProcessor currentProcessor = null;
+ Message message = m_oMsgFactory.getMessage();
+ try {
+
+ ActionUtils.putCurrentObject(message, initialObject);
+
+ // Run the message through each ActionProcessor...
+ for (String action : m_oActions) {
+ ActionDefinition actionDefinition;
+
+ currentAction = action.trim();
+ actionDefinition = m_oActionDefinitionFactory
+ .getInstance(currentAction);
+ if (actionDefinition == null) {
+ throw new java.lang.IllegalStateException(
+ "Bad Listener Configuration. No 'Actions/Action' definition for action ["
+ + currentAction + "].");
+ }
+
+ // The processing result of each action feeds into the
+ // processing of the next action...
+ currentProcessor = actionDefinition.getProcessor();
+ try {
+ ActionUtils.copyCurrentToPrevious(message);
+ message = currentProcessor.process(message);
+ } catch (Exception e) {
+ GpListener.notifyError(listenerConfig, e,
+ currentProcessor.getErrorNotification(message));
+ throw e;
+ }
+
+ if (message == null
+ && action != m_oActions[m_oActions.length - 1]) {
+ logger
+ .warn("Premature termination of action processing pipeline ["
+ + Arrays.asList(m_oActions)
+ + "]. ActionProcessor ["
+ + currentProcessor.getClass().getName()
+ + "] returned a null message result on processing of action ["
+ + currentAction + "].");
+ break;
+ }
+ // Notify on all processors. May want to do this differently
+ // in the future i.e. more selectively ...
+ GpListener.notifyOK(listenerConfig, currentProcessor
+ .getOkNotification(message));
+
+ // Setup the message for processing by the next processor...
+ }
+ } catch (Throwable thrown) {
+ processingError(initialObject, currentProcessor, thrown);
+ logger.error(
+ "Premature termination of action processing pipeline ["
+ + Arrays.asList(m_oActions) + "]. Action ["
+ + currentAction + "] threw an exception.",
+ thrown);
+ }
+
+ processingComplete(initialObject);
+
+ // return the result of the last action in the pipeline
+ return ActionUtils.currentFromMessage(message);
+
+ }
+ }
+
} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-10-07 20:53:52 UTC (rev 6680)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-10-07 21:15:39 UTC (rev 6681)
@@ -36,7 +36,7 @@
*
* @author Esteban
*/
-public abstract class AbstractPoller extends AbstractListener {
+public abstract class AbstractPoller extends AbstractActiveListener {
// You can override these values at constructor time of your
// derived class after calling super(GpListener,DomElement)
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-10-07 20:53:52 UTC (rev 6680)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-10-07 21:15:39 UTC (rev 6681)
@@ -39,7 +39,7 @@
import org.jboss.soa.esb.helpers.AppServerContext;
import org.jboss.soa.esb.helpers.DomElement;
-public class JmsQueueListener extends AbstractListener {
+public class JmsQueueListener extends AbstractActiveListener {
public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
public static final String LISTEN_JNDI_TYPE = "listenJndiType";
@@ -53,6 +53,9 @@
protected Queue m_oQueue;
protected String m_sSelector;
+ //TODO refactor to config setting
+ protected long reconnectionInterval = 2000;
+
protected MessageConsumer jmsMessageReceiver;
@@ -120,7 +123,7 @@
catch (Exception e) {
logger.error("Reconnecting to Queue", e);
try {
- Thread.sleep(m_iSleepForThreads);
+ Thread.sleep(this.reconnectionInterval);
} catch (InterruptedException e1) { // Just return
logger.error("Unexpected thread interupt exception.", e);
return null;
More information about the jboss-svn-commits
mailing list