[jboss-svn-commits] JBL Code SVN: r7324 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 2 09:24:26 EST 2006
Author: jokum
Date: 2006-11-02 09:24:23 -0500 (Thu, 02 Nov 2006)
New Revision: 7324
Added:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Modified:
labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
Log:
HttpListener added to old architecture
Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 14:07:16 UTC (rev 7323)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 14:24:23 UTC (rev 7324)
@@ -23,6 +23,8 @@
package org.jboss.soa.esb.listeners.old;
import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.actions.ActionDefinition;
@@ -63,6 +65,12 @@
protected ActionDefinitionFactory m_oActionDefinitionFactory;
protected MessageFactory m_oMsgFactory;
+ /* The name of the attribute in the configuration containing the number of threads in the pool*/
+ public static final String PARM_MAX_THREADS = "maxThreads";
+
+ /* The thread pool executing the action processing pipeline*/
+ protected ExecutorService pipelineExecutorPool = null;
+
protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
logger = Logger.getLogger(this.getClass());
@@ -75,11 +83,21 @@
String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
if(!sAtt.trim().equals("")) {
m_oActions = sAtt.split(",");
- }
+ }
- sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ //The number of threads configured
+ String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+ if (maxThreads != null) {
+ int iMax = Integer.parseInt(maxThreads);
+ this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ if (this.logger.isInfoEnabled()){
+ this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
+ }
+ } else {
+ this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
+ }
+ this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
+
} // __________________________________
/**
@@ -95,21 +113,10 @@
try { Thread.sleep(500); }
catch(InterruptedException e) {/* ok do nothing */}
} else {
- for (Object currentObj : processList) {
- if (m_iQthr >= m_iMaxThr) {
- logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
- try {
- Thread.sleep(m_iSleepForThreads);
- } catch (InterruptedException e) {
- return;
- }
- break;
- }
-
+ for (Object currentObj : processList) {
// Spawn a thread and push the message message through the pipeline...
ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
- new Thread(runner).start();
- incThreads();
+ this.pipelineExecutorPool.submit(runner);
}
}
}
@@ -157,23 +164,9 @@
* <p/>
* Allows the listener to perform relevant close/cleanup tasks.
*/
- protected abstract void close();
+ protected abstract void close();
/**
- * Increment the active thread count.
- */
- private void incThreads() {
- m_iQthr++;
- }
-
- /**
- * Decrement the active thread count.
- */
- private void decThreads() {
- m_iQthr--;
- }
-
- /**
* Action Processing Pipeline.
* <p/>
* Runs the actions in a listeners "actions" config on a message payload message received
@@ -182,7 +175,7 @@
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
* @since Version 4.0
*/
- private class ActionProcessingPipeline implements Runnable {
+ protected class ActionProcessingPipeline implements Runnable {
private Object initialObject;
@@ -190,7 +183,7 @@
* Private constructor.
* @param pMessage The inital processing target message.
*/
- private ActionProcessingPipeline(Object obj) {
+ protected ActionProcessingPipeline(Object obj) {
initialObject = obj;
}
@@ -253,10 +246,7 @@
} catch(Throwable thrown) {
processingError(initialObject, currentProcessor, thrown);
logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "]. Action [" + currentAction + "] threw an exception.", thrown);
- } finally {
- // Decrement the active thread count for the listener on completion...
- decThreads();
- }
+ }
}
/**
Added: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java 2006-11-02 14:07:16 UTC (rev 7323)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractPassiveListener.java 2006-11-02 14:24:23 UTC (rev 7324)
@@ -0,0 +1,28 @@
+package org.jboss.soa.esb.listeners.old;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+/**
+ * Base class to be implmented by listener implementations which use a channel
+ * implementation doing the listening stuff like periodically receiving on a
+ * queue.
+ *
+ * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
+ *
+ */
+public abstract class AbstractPassiveListener extends AbstractListener {
+
+ protected AbstractPassiveListener(GpListener p_oDad, ConfigTree p_oParms,
+ ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(p_oDad, p_oParms, actionDefinitionFactory);
+ }
+
+ @Override
+ protected Object[] receive() {
+ // nothing to be done here because channel implementation is taking care
+ // of blocking receive stuff
+ return null;
+ }
+
+}
Added: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 14:07:16 UTC (rev 7323)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 14:24:23 UTC (rev 7324)
@@ -0,0 +1,143 @@
+package org.jboss.soa.esb.listeners.old;
+
+import javax.management.MBeanServer;
+
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
+/**
+ *
+ *
+ * @author Johan Kumps
+ *
+ */
+public class HttpListener extends AbstractPassiveListener implements ServerInvocationHandler{
+
+ /* The url to listen on */
+ public String listenHttpUrl = null;
+
+ /* The url this listener will listen on */
+ private static final String LISTEN_HTTP_URL = "listenHttpURL";
+
+ /* The default transport this listener will listen on */
+ private static final String transport = "http";
+
+ /* The default hostname this listener will listen on */
+ private static final String host = "localhost";
+
+ /* The default port this listener will listen on */
+ private static final int port = 5400;
+
+ public HttpListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
+ {
+ super(p_oDad, p_oParms, actionDefinitionFactory);
+ this.listenHttpUrl = GpListener.obtainAtt(p_oParms,LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
+ // initialize the HTTP server
+ this.initServer();
+ }
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if actionClass not specified or not in classpath or invalid
+ * int values for maxThreads or pollLatencySecs
+ *
+ */
+ protected void checkParams() throws Exception {
+ // listener url
+ this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig, LISTEN_HTTP_URL,
+ this.getDefaultListenHttpUrl());
+ }
+
+ public Object invoke(InvocationRequest invocationRequest) throws Throwable {
+ //Retrieving the real payload of this invocationRequest
+ Object payload = invocationRequest.getParameter();
+
+ if (this.logger.isInfoEnabled()) {
+ this.logger
+ .info("HttpInvocationListener is invoked...The given payload is : "
+ + payload);
+ }
+ //Start the action processing pipeline
+ ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
+ this.pipelineExecutorPool.submit(pipelineRunner);
+
+ return payload;
+ }
+
+ /**
+ * Method returning the default listenHttpUrl for this HttpListener instance
+ *
+ * @return the default listen url
+ */
+ private String getDefaultListenHttpUrl() {
+ return HttpListener.transport + "://" + HttpListener.host + ":"
+ + HttpListener.port;
+ }
+
+ private void initServer() throws Exception {
+ InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
+ if (this.logger.isInfoEnabled()) {
+ this.logger.info("Starting remoting server with locator uri of: "
+ + this.listenHttpUrl);
+ }
+ Connector connector = new Connector(locator);
+ connector.create();
+ connector.addInvocationHandler("HttpInvocationHandler", this);
+
+ // Starting the server deamon
+ connector.start();
+
+ if (this.logger.isInfoEnabled()) {
+ this.logger.info("HttpListener deamon started successfully!");
+ }
+
+ }
+
+ @Override
+ protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void processingComplete(Object initialMsg) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setMBeanServer(MBeanServer arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void setInvoker(ServerInvoker arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void addListener(InvokerCallbackHandler arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void removeListener(InvokerCallbackHandler arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
More information about the jboss-svn-commits
mailing list