[jboss-svn-commits] JBL Code SVN: r24497 - in labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss: soa/esb/client and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Dec 26 00:22:32 EST 2008
Author: jim.ma
Date: 2008-12-26 00:22:32 -0500 (Fri, 26 Dec 2008)
New Revision: 24497
Modified:
labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java
labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java
labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java
Log:
Refactored TomcatListener
Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java 2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java 2008-12-26 05:22:32 UTC (rev 24497)
@@ -19,9 +19,10 @@
*/
package org.jboss.internal.soa.esb.couriers;
-import java.io.ObjectInputStream;
+import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.net.URISyntaxException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.httpclient.HttpClient;
@@ -33,41 +34,56 @@
import org.apache.log4j.Logger;
import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.ContextObjectInputStream;
import org.jboss.soa.esb.util.Util;
+
/**
- * Deliver the ESB message to http endpoint and pickup ESB message from Http endpoint
+ * Deliver the ESB message to http endpoint and pickup ESB message from Http
+ * endpoint
+ *
* @author <a href="mailto:ema at redhat.com">Jim Ma</a>
*/
public class HttpCourier implements PickUpOnlyCourier, DeliverOnlyCourier {
-
+
public ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<Message>();
-
+
public static String REPLY_EPR_NAME = "reply";
-
- public static String GET_REPLY_NAME = "get";
-
+
protected static Logger logger = Logger.getLogger(InVMCourier.class);
-
+
private HTTPEpr epr;
-
+
private HttpClient httpClient = null;
-
+
+ private PostMethod method = null;
+
+ private Message repsonseMessage = null;
+
public HttpCourier(HTTPEpr epr) {
this.epr = epr;
HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
- HttpConnectionManagerParams params = new HttpConnectionManagerParams() ;
- params.setMaxTotalConnections(Integer.parseInt(epr.getThreads())) ;
+ HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+ params.setMaxTotalConnections(Integer.parseInt(epr.getThreads()));
connectionManager.setParams(params);
httpClient = new HttpClient(connectionManager);
}
+
public boolean deliver(Message message) {
if (message == null) {
- return false;
- }
-
- PostMethod method = null;
+ return false;
+ }
+
try {
- //TODO:revisit it to see if the "/" can be removed
+ if (epr.getURI().toString().endsWith("reply")) {
+ // do nothing,reply message will be delivered by servlet response
+ return true;
+ }
+ } catch (URISyntaxException e1) {
+ logger.error("Failed to get uri from EPR " + epr);
+ return false;
+ }
+
+ try {
method = new PostMethod(epr.getURI().toString() + "/");
java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
@@ -84,59 +100,28 @@
try {
httpClient.executeMethod(method);
+ if (method.getResponseBody().length > 0) {
+ InputStream ins = method.getResponseBodyAsStream();
+ Serializable serial = (Serializable) new ContextObjectInputStream(
+ ins).readObject();
+ repsonseMessage = Util.deserialize(serial);
+ }
+
} catch (Exception e) {
- logger.error("Failed to deliver message :[ " + message + "] to endpoint " + epr);
+ logger.error("Failed to get response message :[ " + message + "]");
return false;
-
} finally {
method.releaseConnection();
}
return true;
}
-
+
public Message pickup(long millis) {
- Message message = null;
- if (epr.isReplyEpr()) {
- PostMethod method = null;
- try {
- method = new PostMethod(epr.getURI().toString() + "/" + GET_REPLY_NAME);
- httpClient.executeMethod(method);
- byte[] responseBytes = method.getResponseBody();
- if (responseBytes == null || responseBytes.length == 0) {
- return null;
- }
+ return repsonseMessage;
+ }
- java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(
- responseBytes);
- Serializable serial = (Serializable) new ObjectInputStream(bin)
- .readObject();
- message = Util.deserialize(serial);
- } catch (Exception e) {
- logger.error("Failed to pickup message from " + epr);
- } finally {
- method.releaseConnection();
- }
- } else {
-
- synchronized (queue) {
- message = queue.poll();
- if (message == null) {
- try {
- queue.wait(millis);
- } catch (InterruptedException e) {
-
- }
- message = queue.poll();
- }
-
- }
- }
- return message;
+ public void cleanup() {
}
-
-
- public void cleanup() {
- }
-
-}
+
+}
Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java 2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java 2008-12-26 05:22:32 UTC (rev 24497)
@@ -21,9 +21,8 @@
import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -31,48 +30,102 @@
import javax.servlet.http.HttpServletResponse;
import javax.xml.parsers.ParserConfigurationException;
-import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
-import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.message.ActionProcessingPipeline;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.util.ContextObjectInputStream;
import org.jboss.soa.esb.util.Util;
-/**
- * This servlet processes the ESB message delivered to http endpoint and prepare the
- * reply message to client
- * <p> It will server for these request url :
- * <p> http://localhost:8765/tomcat/listener/
- * <p> http://localhost:8765/tomcat/listener/reply
- * <p> http://localhost:8765/tomcat/listener/reply/get
+/**
+ * This servlet processes the ESB message delivered to http endpoint and return message processed
+ * by ActionprossingPipeline
* @author <a href="mailto:ema at redhat.com">Jim Ma</a>
*/
public class HttpESBListenerServlet extends HttpServlet {
HttpCourier requestCourier = null;
HttpCourier responseCourier = null;
+ private boolean synchronous = false;
+
+ private ActionProcessingPipeline pipeline ;
+ private EPR epr;
+ private ConfigTree config;
public void init() throws ServletException {
-
+ config = (ConfigTree) getServletContext().getAttribute("config");
+ if (ListenerTagNames.MEP_REQUEST_RESPONSE.equals(config.getAttribute(ListenerTagNames.MEP_ATTRIBUTE_TAG))) {
+ synchronous = true;
+ }
+
+ try
+ {
+ pipeline = new ActionProcessingPipeline(config) ;
+ //pipeline.setTransactional(transactional);
+ pipeline.initialise() ;
+
+ }
+ catch (final Exception ce)
+ {
+ throw new ServletException(ce);
+ }
+
+ ConfigTree eprElement = config.getFirstChild(ListenerTagNames.EPR_TAG);;
+ try {
+ epr = ListenerUtil.assembleEpr(eprElement);
+ RegistryUtil.register(config, epr);
+ } catch (Exception e) {
+ throw new ServletException(e);
+ }
+
+
}
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- service(req,resp);
+ doService(req,resp);
}
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- service(req,resp);
+ doService(req,resp);
}
- protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doService(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String requestInfo = req.getServletPath();
if ("/".equals(requestInfo)) {
- addRequestMessage(req, resp);
- } else if (("/"+ HttpCourier.REPLY_EPR_NAME + "/") .equals(requestInfo) ) {
- addResponseMessage(req, resp);
- } else if (("/"+ HttpCourier.REPLY_EPR_NAME + "/" + HttpCourier.GET_REPLY_NAME).equals(requestInfo)) {
- getResponseMessage(req, resp);
+ Serializable serial;
+ try {
+ serial = (Serializable) new ContextObjectInputStream(req.getInputStream()).readObject();
+ } catch (Exception e1) {
+ throw new ServletException(e1);
+ }
+ try {
+ Message msg = Util.deserialize(serial);
+ boolean result = pipeline.process(msg);
+ if (result && synchronous) {
+ OutputStream output = resp.getOutputStream();
+ try {
+ new ObjectOutputStream(output).writeObject(Util.serialize(msg));
+ } catch (ParserConfigurationException e) {
+ throw new ServletException(e);
+ }
+ output.flush();
+ }
+
+ if (!result) {
+ //TODO: Modify the ActionProcessingPiple line to get Fault message from origianl message
+ //Get default fault to message
+ }
+
+ } catch (Exception e) {
+ throw new ServletException("Error when deserialize the request ESB message");
+
+ }
+
} else {
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
}
@@ -80,103 +133,11 @@
}
- public void addRequestMessage(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- if (requestCourier == null) {
- String server = req.getServerName();
- int port = req.getServerPort();
- String context = req.getContextPath();
- String requestURI = "http://" + server + ":" + port + context;
- URI uri = null;
- try {
- uri = new URI(requestURI);
- HTTPEpr epr = new HTTPEpr(uri);
- requestCourier = CourierFactory.getInstance().getHttpCourier(epr);
- } catch (URISyntaxException e) {
- //expected , do nothing
- }
-
- }
-
- Serializable serial;
+ public void destroy() {
try {
- serial = (Serializable) new ContextObjectInputStream(req.getInputStream()).readObject();
- } catch (Exception e1) {
-
- throw new ServletException(e1);
+ RegistryUtil.unregister(config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG), config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG), epr);
+ }catch (Exception e) {
+ e.printStackTrace();
}
- try {
- Message msg = Util.deserialize(serial);
- synchronized (requestCourier.queue) {
- requestCourier.queue.offer(msg);
- requestCourier.queue.notify();
- }
- } catch (Exception e) {
- throw new ServletException("Error when deserialize the request ESB message");
-
- }
}
-
- public void addResponseMessage(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- if (responseCourier == null) {
- String server = req.getServerName();
- int port = req.getServerPort();
- String context = req.getContextPath();
- String requestURI = "http://" + server + ":" + port + context + "/" + HttpCourier.REPLY_EPR_NAME;
- URI uri = null;
- try {
- uri = new URI(requestURI);
- HTTPEpr epr = new HTTPEpr(uri);
- responseCourier = CourierFactory.getInstance().getHttpCourier(epr);
-
- } catch (URISyntaxException e) {
- //Do nothing
- }
- }
- Serializable serial = null;
- try {
- serial = (Serializable) new ContextObjectInputStream(req.getInputStream()).readObject();
- } catch (ClassNotFoundException e1) {
- throw new ServletException(e1);
- }
- try {
- Message msg = Util.deserialize(serial);
- synchronized (responseCourier.queue) {
- responseCourier.queue.offer(msg);
- responseCourier.queue.notify();
- }
- } catch (Exception e) {
- throw new ServletException("Error when deserialize the request ESB message");
-
- }
- }
-
- public void getResponseMessage(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- if (responseCourier == null) {
- return;
- }
- Message msg = null;
- synchronized (responseCourier.queue) {
- msg = responseCourier.queue.poll();
- if (msg == null) {
- try {
- responseCourier.queue.wait(2000);
- } catch (InterruptedException e) {
- //Do nothing
- }
- msg = responseCourier.queue.poll();
- }
- }
- if (msg == null) {
- return;
- }
- java.io.OutputStream output = resp.getOutputStream();
- try {
- new ObjectOutputStream(output).writeObject(Util.serialize(msg));
- } catch (ParserConfigurationException e) {
- throw new ServletException(e);
- }
- output.flush();
-
- }
-
}
Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java 2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java 2008-12-26 05:22:32 UTC (rev 24497)
@@ -31,6 +31,7 @@
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.couriers.*;
+
import org.jboss.soa.esb.filter.FilterManager;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.util.Type;
@@ -127,14 +128,12 @@
return null;
if (epr instanceof InVMEpr)
return CourierFactory.getInstance().getInVMCourier((InVMEpr) epr);
- if (epr instanceof HTTPEpr)
- return CourierFactory.getInstance().getHttpCourier((HTTPEpr)epr);
-
+ if (epr instanceof HTTPEpr)
+ return new HttpCourier((HTTPEpr)epr);
if (epr instanceof JMSEpr)
return new JmsCourier((JMSEpr) epr, pickUpOnly);
if (epr instanceof FileEpr)
- return new FileCourier((FileEpr) epr, pickUpOnly);
-
+ return new FileCourier((FileEpr) epr, pickUpOnly);
if (epr instanceof JDBCEpr)
return new SqlTableCourier((JDBCEpr) epr, pickUpOnly);
Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2008-12-26 05:22:32 UTC (rev 24497)
@@ -24,8 +24,11 @@
import java.util.List;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.addressing.eprs.DefaultHttpReplyToEpr;
import org.jboss.internal.soa.esb.addressing.helpers.EPRHelper;
import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.internal.soa.esb.couriers.HttpCourier;
+import org.jboss.internal.soa.esb.couriers.TwoWayCourierImpl;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.addressing.EPR;
@@ -529,12 +532,22 @@
}
if (courier.deliver(message)) {
if (synchronous) {
- courier.cleanup() ;
+
// JBESB-1016 replyToEPR has to be non-null or we'd have dropped out by this point!
// do we need to do this for synchronous calls? Vagueries of Couriers?
+ //It is a big hack for HTTP EPR. Look at if we can create another TwoWayCourierImpl to do that
+ if (replyToEPR instanceof DefaultHttpReplyToEpr) {
+ TwoWayCourierImpl twoWayCourier = (TwoWayCourierImpl)courier;
+ HttpCourier httpCourier = (HttpCourier)twoWayCourier.getDeliverCourier();
+ return httpCourier.pickup(timeout);
+
+ }
+ courier.cleanup() ;
+
courier.setReplyToEpr(replyToEPR);
+
return courier.pickup(timeout);
} else {
return message;
Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java 2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java 2008-12-26 05:22:32 UTC (rev 24497)
@@ -33,11 +33,14 @@
import org.apache.log4j.Logger;
import org.jboss.mx.util.MBeanServerLocator;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
import org.jboss.soa.esb.listeners.gateway.TomcatServer;
+import org.jboss.soa.esb.listeners.lifecycle.AbstractManagedLifecycle;
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
/**
@@ -47,11 +50,8 @@
*
*/
-public class HttpMessageAwareListener extends MessageAwareListener {
- private static final long serialVersionUID = 1L;
+public class HttpMessageAwareListener extends AbstractManagedLifecycle {
- private static Logger logger = Logger.getLogger(HttpMessageAwareListener.class);
-
/**JMX domain name for tomcat listener*/
public static String DOMAIN_NAME = "jboss.esb.tomcat";
@@ -64,24 +64,31 @@
public String httpContext = null;
public String dispatchServletClassName = "";
+
+
+ private static final long serialVersionUID = 1L;
+
+ private static Logger logger = Logger.getLogger(HttpMessageAwareListener.class);
+
+ private EPR _epr;
public HttpMessageAwareListener(final ConfigTree config)
throws ConfigurationException {
super(config);
List<KeyValuePair> properties = getConfig().childPropertyList();
String maxThreads = getConfig().getAttribute(ListenerTagNames.MAX_THREADS_TAG);
- if (maxThreads != null && getEpr() != null) {
- HTTPEpr epr = (HTTPEpr)getEpr();
- epr.setThreads(maxThreads);
- }
- ConfigTree eprElement = config.getFirstChild(ListenerTagNames.EPR_TAG);
+ ConfigTree eprElement = config.getFirstChild(ListenerTagNames.EPR_TAG);
host = eprElement.getAttribute(ListenerTagNames.HOST_TAG, "localhost");
port = eprElement.getAttribute(ListenerTagNames.PORT_TAG);
httpContext = eprElement.getAttribute(ListenerTagNames.CONTEXT_TAG);
+ _epr = ListenerUtil.assembleEpr(eprElement);
}
+ protected void doInitialise() throws ManagedLifecycleException {
+
+ }
+
protected void doStart() throws ManagedLifecycleException {
- super.doStart();
try {
this.startHttpServer();
} catch (Exception e) {
@@ -90,7 +97,6 @@
}
}
protected void doStop() throws ManagedLifecycleException {
- super.doStop();
try {
stopHttpServer();
} catch (Exception e) {
@@ -98,6 +104,9 @@
}
}
+ protected void doDestroy() throws ManagedLifecycleException {
+
+ }
/**
* Start the tomcat http server.It will check if it really needs to create
@@ -109,6 +118,7 @@
protected void startHttpServer() throws Exception {
TomcatServer.getInstance().createHost(host);
List<KeyValuePair> properties = getConfig().childPropertyList();
+ //TODO:add maxThreads option
TomcatServer.getInstance().createConnector(port, properties);
ctx = new StandardContext();
@@ -116,9 +126,11 @@
ContextConfig ctxCfg = new ContextConfig();
ctx.addLifecycleListener(ctxCfg);
+
ctxCfg.setDefaultWebXml("org/apache/catalin/startup/NO_DEFAULT_XML");
ctx.setPath(httpContext);
ctx.setDocBase(".");
+ ctx.getServletContext().setAttribute("config", this.getConfig());
initWebappDefaults(ctx);
TomcatServer.getInstance().addContext(host, ctx);
}
More information about the jboss-svn-commits
mailing list