[jboss-svn-commits] JBL Code SVN: r8278 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Dec 13 05:13:53 EST 2006
Author: estebanschifman
Date: 2006-12-13 05:13:49 -0500 (Wed, 13 Dec 2006)
New Revision: 8278
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
Log:
ActionProcessingPipeline now extends Observable - new transport agnostic MessageAwareListener
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2006-12-13 08:46:29 UTC (rev 8277)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2006-12-13 10:13:49 UTC (rev 8278)
@@ -3,6 +3,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Observable;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
@@ -19,7 +20,7 @@
* @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
* @since Version 4.0
*/
-public class ActionProcessingPipeline implements Runnable
+public class ActionProcessingPipeline extends Observable implements Runnable
{
private boolean processingComplete = false;
@@ -125,8 +126,12 @@
catch(IllegalAccessException e)
{ _logger.error(prematureTermination("unable to access method"),e); }
catch (IllegalArgumentException e) {
- } finally {
+ }
+ finally
+ {
processingComplete = true;
+ notifyObservers(new Integer(-1));
+ setChanged();
}
}
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-13 08:46:29 UTC (rev 8277)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-13 10:13:49 UTC (rev 8278)
@@ -0,0 +1,260 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.message;
+
+import java.lang.reflect.Method;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * Esb Message aware transport independent listener.
+ * <p/> Relies on the CourierFactory to obtain an appropriate Courier
+ * for the EPR this listener will be listening on
+ * <br/>Keeps a thread pool to instantiate ActionProcessingPipelines whenever a Message is received
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+
+public class MessageAwareListener implements Runnable, Observer
+{
+
+ /**
+ * public constructor
+ * @param controller EsbListenerController - the controlling process
+ * @param config ConfigTree - Containing 'static' configuration for this instance
+ * @throws Exception
+ */
+ public MessageAwareListener(EsbListenerController controller, ConfigTree config)
+ throws ConfigurationException
+ {
+ _millisToWaitForThread = 50;
+ _defaultMaxThreads = 1;
+ _controller = controller;
+ _config = config;
+
+ checkMyParms();
+ } // _______________________________
+
+ // Child threads will send a -1 when their run() method ends
+ // we need to prevent picking up Messages when there are no available threads in pool
+ public void update(Observable o, Object arg)
+ {
+ if (arg instanceof Integer)
+ {
+ _qRunningThreads += ((Integer)arg).intValue();
+ _logger.debug("Thread returned to pool("+_qRunningThreads+"/"+_maxThreads);
+ }
+ } //________________________________
+
+ /**
+ * Loops until controlling process determines
+ * <br/>Invokes appropriate Courier to obtain incoming ESB Messages
+ * <br/>When one is received, instantiates an action processing pipeline to process it
+ */
+ public void run()
+ {
+ _logger.debug("run() method of "+this.getClass().getSimpleName()
+ +" started on thread "+Thread.currentThread().getName());
+
+ boolean bRegisterOK = false;
+ try
+ {
+ registerProcess();
+ bRegisterOK = true;
+ }
+ catch (Exception re)
+ { _logger.fatal("Could not register service " + re.getLocalizedMessage(),re); }
+
+ if (bRegisterOK)
+ {
+ // Instantiate the pool thread and set active count to zero
+ _execService = Executors.newFixedThreadPool(_maxThreads);
+ _qRunningThreads = 0;
+
+ while (_controller.continueLooping())
+ {
+ // Check to see if there are available threads in pool
+ if (_qRunningThreads >= _maxThreads)
+ try
+ {
+ Thread.sleep(_millisToWaitForThread);
+ continue;
+ }
+ catch (InterruptedException e)
+ {
+ break;
+ }
+
+ // Only pickup a message when a thread is available
+ long lWait = _controller.millisToWait();
+
+ // Just in case (this should never happen - it's a safety net)
+ if (lWait< 1)
+ continue;
+
+ Message message = null;
+ try { message = (lWait > 0 ) ? _pickUpCourier.pickup(lWait) : null; }
+ catch (CourierTimeoutException e) { continue; }
+ catch (CourierException e)
+ {
+ _logger.error(e);
+ continue;
+ }
+
+ if (null!=message)
+ {
+ ActionProcessingPipeline chain = null;
+
+ try
+ {
+ chain = new ActionProcessingPipeline(message,_config);
+ chain.addObserver(this);
+ _qRunningThreads++;
+ _logger.debug("Thread claimed from pool("+_qRunningThreads+"/"+_maxThreads);
+ _execService.execute(chain);
+ }
+ catch (Exception e) { _logger.error(e); continue; }
+
+ }
+ }
+
+ }
+ try { unregisterProcess(); }
+ catch (Exception re)
+ {
+ _logger.warn("Could not un register service " + re.getLocalizedMessage());
+ }
+ // shutdown should wait for all child threads to finish
+ if (null!=_execService)
+ _execService.shutdown();
+
+ _logger.debug("run() method of "+this.getClass().getSimpleName()
+ +" finished on thread "+Thread.currentThread().getName());
+ } // _______________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException
+ {
+ _eprCategoryName= obtainAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG,null);
+ _eprName = obtainAttribute(ListenerTagNames.SERVICE_NAME_TAG,null);
+
+ String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
+ if (!Util.isNullString(attr))
+ try { _maxThreads = Integer.parseInt(attr); }
+ catch (Exception e) { attr = null; }
+ if (Util.isNullString(attr))
+ {
+ _maxThreads = _defaultMaxThreads;
+ _logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
+ +" - Using default value of <"+_maxThreads+">");
+ }
+
+ } // ________________________________
+
+ private void registerProcess() throws Exception
+ {
+ _bRegistered = false;
+ _controller.register(_config,_epr);
+ _bRegistered = true;
+ _pickUpCourier = CourierFactory.getPickupCourier(_epr);
+ } // ________________________________
+
+ private void unregisterProcess() throws Exception
+ {
+ if (null!=_pickUpCourier)
+ try
+ {
+ Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
+ cleanMethod.invoke(_pickUpCourier, new Object[] {});
+ }
+ catch (NoSuchMethodException e) {/* OK Just don't invoke it */ }
+ catch (Exception e)
+ {
+ _logger.error("Problems invoking courier.clean() Method",e);
+ }
+
+ if (_bRegistered)
+ _controller.unRegister(_eprCategoryName, _eprName, _epr);
+
+ } // ________________________________
+
+ protected String obtainAttribute(String p_sAtt, String p_sDefault)
+ throws ConfigurationException
+ {
+ _logger.info("Reading value for " + p_sAtt);
+ String sVal = _config.getAttribute(p_sAtt);
+ if ((null == sVal) && (null == p_sDefault))
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+
+ public void waitUntilReadyToPickup()
+ {
+ while(null==_execService)
+ try { Thread.sleep(50); }
+ catch (InterruptedException e)
+ { _logger.warn(this.getClass().getSimpleName()+" startup interrupted", e); }
+ } //________________________________
+
+
+ protected ConfigTree _config = null;
+ protected EsbListenerController _controller = null;
+
+ protected String _eprCategoryName;
+ protected String _eprName;
+ protected EPR _epr;
+
+ protected int _maxThreads;
+ protected int _qRunningThreads;
+ protected int _defaultMaxThreads;
+ protected long _millisToWaitForThread;
+ protected ExecutorService _execService;
+ protected boolean _bRegistered;
+
+ protected PickUpOnlyCourier _pickUpCourier;
+
+ protected static transient Logger _logger = Logger.getLogger(MessageAwareListener.class);
+
+}
More information about the jboss-svn-commits
mailing list