[jboss-svn-commits] JBL Code SVN: r24497 - in labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss: soa/esb/client and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Dec 26 00:22:32 EST 2008


Author: jim.ma
Date: 2008-12-26 00:22:32 -0500 (Fri, 26 Dec 2008)
New Revision: 24497

Modified:
   labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java
   labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java
   labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
   labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
   labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java
Log:
Refactored TomcatListener

Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java	2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpCourier.java	2008-12-26 05:22:32 UTC (rev 24497)
@@ -19,9 +19,10 @@
  */
 package org.jboss.internal.soa.esb.couriers;
 
-import java.io.ObjectInputStream;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.net.URISyntaxException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.httpclient.HttpClient;
@@ -33,41 +34,56 @@
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.ContextObjectInputStream;
 import org.jboss.soa.esb.util.Util;
+
 /**
- * Deliver the ESB message to http endpoint and pickup ESB message from Http endpoint 
+ * Deliver the ESB message to http endpoint and pickup ESB message from Http
+ * endpoint
+ * 
  * @author <a href="mailto:ema at redhat.com">Jim Ma</a>
  */
 public class HttpCourier implements PickUpOnlyCourier, DeliverOnlyCourier {
-	
+
 	public ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<Message>();
-	
+
 	public static String REPLY_EPR_NAME = "reply";
-	
-	public static String GET_REPLY_NAME = "get";
-	
+
 	protected static Logger logger = Logger.getLogger(InVMCourier.class);
-	
+
 	private HTTPEpr epr;
-	
+
 	private HttpClient httpClient = null;
-	
+
+	private PostMethod method = null;
+
+	private Message repsonseMessage = null;
+
 	public HttpCourier(HTTPEpr epr) {
 		this.epr = epr;
 		HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
-		HttpConnectionManagerParams params = new HttpConnectionManagerParams() ;
-		params.setMaxTotalConnections(Integer.parseInt(epr.getThreads())) ;
+		HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+		params.setMaxTotalConnections(Integer.parseInt(epr.getThreads()));
 		connectionManager.setParams(params);
 		httpClient = new HttpClient(connectionManager);
 	}
+
 	public boolean deliver(Message message) {
 		if (message == null) {
-            return false;
-        }
-	    
-	    PostMethod method = null;
+			return false;
+		}
+
 		try {
-			//TODO:revisit it to see if the "/" can be removed
+			if (epr.getURI().toString().endsWith("reply")) {
+				// do nothing,reply message will be delivered by servlet response
+				return true;
+			}
+		} catch (URISyntaxException e1) {
+			logger.error("Failed to get uri from EPR " + epr);
+			return false;
+		}
+
+		try {
 			method = new PostMethod(epr.getURI().toString() + "/");
 			java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
 
@@ -84,59 +100,28 @@
 
 		try {
 			httpClient.executeMethod(method);
+			if (method.getResponseBody().length > 0) {
+				InputStream ins = method.getResponseBodyAsStream();
+				Serializable serial = (Serializable) new ContextObjectInputStream(
+						ins).readObject();
+				repsonseMessage = Util.deserialize(serial);
+			}
+
 		} catch (Exception e) {
-			logger.error("Failed to deliver message :[ " + message + "] to endpoint " + epr);
+			logger.error("Failed to get response message :[ " + message + "]");
 			return false;
-
 		} finally {
 			method.releaseConnection();
 		}
 
 		return true;
 	}
-	
+
 	public Message pickup(long millis) {
-		Message message = null;
-		if (epr.isReplyEpr()) {
-			PostMethod method = null;
-			try {
-				method = new PostMethod(epr.getURI().toString() + "/" + GET_REPLY_NAME);
-				httpClient.executeMethod(method);
-				byte[] responseBytes = method.getResponseBody();
-				if (responseBytes == null || responseBytes.length == 0) {
-					return null;
-				}
+		return repsonseMessage;
+	}
 
-				java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(
-						responseBytes);
-				Serializable serial = (Serializable) new ObjectInputStream(bin)
-						.readObject();
-				message = Util.deserialize(serial);
-			} catch (Exception e) {
-				logger.error("Failed to pickup message from " + epr); 
-			} finally {
-				method.releaseConnection();
-			}
-		} else {
-
-			synchronized (queue) {
-				message = queue.poll();
-				if (message == null) {
-					try {
-						queue.wait(millis);
-					} catch (InterruptedException e) {
-						
-					}
-					message = queue.poll();
-				}
-
-			}
-		}
-		return message;
+	public void cleanup() {
 	}
-	
-	
-    public void cleanup() {
-    }
-	
-}  
+
+}

Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java	2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/HttpESBListenerServlet.java	2008-12-26 05:22:32 UTC (rev 24497)
@@ -21,9 +21,8 @@
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -31,48 +30,102 @@
 import javax.servlet.http.HttpServletResponse;
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
-import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.message.ActionProcessingPipeline;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.util.ContextObjectInputStream;
 import org.jboss.soa.esb.util.Util;
 
 
-/**
- * This servlet processes the ESB message delivered to http endpoint and prepare the 
- * reply message to client
- * <p> It will server for these request url :
- * <p> http://localhost:8765/tomcat/listener/
- * <p> http://localhost:8765/tomcat/listener/reply
- * <p> http://localhost:8765/tomcat/listener/reply/get
+/**	
+ * This servlet processes the ESB message delivered to http endpoint and return message processed 
+ * by ActionprossingPipeline
  * @author <a href="mailto:ema at redhat.com">Jim Ma</a>
  */
 public class HttpESBListenerServlet extends HttpServlet {
     HttpCourier requestCourier = null;
     HttpCourier responseCourier = null;
+    private boolean synchronous = false;
+    
+	private ActionProcessingPipeline pipeline ;
+	private EPR epr;
+	private ConfigTree config;
 	public void init() throws ServletException {
-		
+		config = (ConfigTree) getServletContext().getAttribute("config");
+		if (ListenerTagNames.MEP_REQUEST_RESPONSE.equals(config.getAttribute(ListenerTagNames.MEP_ATTRIBUTE_TAG))) {
+			synchronous = true;
+		}
+
+         try
+         {
+             pipeline = new ActionProcessingPipeline(config) ;
+             //pipeline.setTransactional(transactional);
+             pipeline.initialise() ;
+             
+         }
+         catch (final Exception ce)
+         {
+        	throw new ServletException(ce);
+         }
+         
+ 		ConfigTree eprElement = config.getFirstChild(ListenerTagNames.EPR_TAG);;
+		try {
+			epr = ListenerUtil.assembleEpr(eprElement);
+			RegistryUtil.register(config, epr);
+		} catch (Exception e) {
+			throw new ServletException(e);		
+		} 
+
+         
 	}
 	
 	protected void doPost(HttpServletRequest req, HttpServletResponse resp)
     throws ServletException, IOException {
-		service(req,resp);
+		doService(req,resp);
 	}
 	
 	
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
     throws ServletException, IOException {
-		service(req,resp);
+		doService(req,resp);
 	}
 	
-	protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+	protected void doService(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		String requestInfo = req.getServletPath();
 		if ("/".equals(requestInfo)) {
-			addRequestMessage(req, resp);
-		} else	if (("/"+ HttpCourier.REPLY_EPR_NAME + "/") .equals(requestInfo) ) {
-			addResponseMessage(req, resp);
-		} else	if (("/"+ HttpCourier.REPLY_EPR_NAME + "/" + HttpCourier.GET_REPLY_NAME).equals(requestInfo)) {
-			getResponseMessage(req, resp);
+			Serializable serial;
+			try {
+				serial = (Serializable) new ContextObjectInputStream(req.getInputStream()).readObject();
+			} catch (Exception e1) {				
+				throw new ServletException(e1);
+			}
+			try {
+				Message msg = Util.deserialize(serial);
+				boolean result = pipeline.process(msg);
+				if (result && synchronous) {
+					OutputStream output = resp.getOutputStream();
+					try {
+						new ObjectOutputStream(output).writeObject(Util.serialize(msg));
+					} catch (ParserConfigurationException e) {
+						throw new ServletException(e);
+					}
+					output.flush();
+				} 
+				
+				if (!result) {
+					//TODO: Modify the ActionProcessingPiple line to get Fault message from origianl message
+					//Get default fault to message
+				}
+				
+			} catch (Exception e) {
+				throw new ServletException("Error when deserialize the request ESB message");
+				
+			}
+	        	
 		} else {
 			resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
 		}
@@ -80,103 +133,11 @@
 
 	}
 	
-	public void addRequestMessage(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		if (requestCourier == null) {
-			String server = req.getServerName();
-		    int port = req.getServerPort();
-		    String context = req.getContextPath();
-		    String requestURI = "http://" + server + ":" + port + context;
-		    URI uri = null;
-			try {
-				uri = new URI(requestURI);
-				HTTPEpr epr = new HTTPEpr(uri);
-			    requestCourier = CourierFactory.getInstance().getHttpCourier(epr);
-			} catch (URISyntaxException e) {
-				//expected , do nothing
-			}
-            
-		}
-		
-		Serializable serial;
+	public void destroy() {
 		try {
-			serial = (Serializable) new ContextObjectInputStream(req.getInputStream()).readObject();
-		} catch (Exception e1) {
-			
-			throw new ServletException(e1);
+			RegistryUtil.unregister(config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG), config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG), epr);
+		}catch (Exception e) {
+			e.printStackTrace();
 		}
-		try {
-			Message msg = Util.deserialize(serial);
-			synchronized (requestCourier.queue) {
-				requestCourier.queue.offer(msg);
-				requestCourier.queue.notify();
-			}
-		} catch (Exception e) {
-			throw new ServletException("Error when deserialize the request ESB message");
-			
-		}
 	}
-	
-	public void addResponseMessage(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		if (responseCourier == null) {
-			String server = req.getServerName();
-		    int port = req.getServerPort();
-		    String context = req.getContextPath();
-		    String requestURI = "http://" + server + ":" + port + context + "/" + HttpCourier.REPLY_EPR_NAME;
-		    URI uri = null;
-			try {
-				uri = new URI(requestURI);
-				HTTPEpr epr = new HTTPEpr(uri);
-			    responseCourier = CourierFactory.getInstance().getHttpCourier(epr);
-
-			} catch (URISyntaxException e) {
-				//Do nothing
-			}
-					} 
-		Serializable serial = null;
-		try {
-			serial = (Serializable) new ContextObjectInputStream(req.getInputStream()).readObject();
-		} catch (ClassNotFoundException e1) {
-			throw new ServletException(e1);
-		}
-		try {
-			Message msg = Util.deserialize(serial);
-			synchronized (responseCourier.queue) {
-				responseCourier.queue.offer(msg);
-				responseCourier.queue.notify();
-			}
-		} catch (Exception e) {
-			throw new ServletException("Error when deserialize the request ESB message");
-			
-		}
-	}
-	
-	public void getResponseMessage(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		if (responseCourier == null) {
-			return;
-		}
-		Message msg = null;
-		synchronized (responseCourier.queue) {
-			msg = responseCourier.queue.poll();
-			if (msg == null) {
-				try {
-					responseCourier.queue.wait(2000);
-				} catch (InterruptedException e) {
-					//Do nothing
-				}
-				msg = responseCourier.queue.poll();
-			}
-		}
-		if (msg == null) {
-			return;
-		}
-		java.io.OutputStream output = resp.getOutputStream();
-		try {
-			new ObjectOutputStream(output).writeObject(Util.serialize(msg));
-		} catch (ParserConfigurationException e) {
-			throw new ServletException(e);
-		}
-		output.flush();
-		
-	}	
-
 }

Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java	2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/internal/soa/esb/couriers/TwoWayCourierImpl.java	2008-12-26 05:22:32 UTC (rev 24497)
@@ -31,6 +31,7 @@
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.couriers.*;
+
 import org.jboss.soa.esb.filter.FilterManager;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.util.Type;
@@ -127,14 +128,12 @@
 			return null;
         if (epr instanceof InVMEpr)
             return CourierFactory.getInstance().getInVMCourier((InVMEpr) epr);
-		if (epr instanceof HTTPEpr)
-			return CourierFactory.getInstance().getHttpCourier((HTTPEpr)epr);
-        
+		if (epr instanceof HTTPEpr) 
+			return new HttpCourier((HTTPEpr)epr);	        
         if (epr instanceof JMSEpr)
 			return new JmsCourier((JMSEpr) epr, pickUpOnly);
 		if (epr instanceof FileEpr)
-			return new FileCourier((FileEpr) epr, pickUpOnly);
-		
+			return new FileCourier((FileEpr) epr, pickUpOnly);		
 		if (epr instanceof JDBCEpr)
 			return new SqlTableCourier((JDBCEpr) epr, pickUpOnly);
 

Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2008-12-26 05:22:32 UTC (rev 24497)
@@ -24,8 +24,11 @@
 import java.util.List;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.addressing.eprs.DefaultHttpReplyToEpr;
 import org.jboss.internal.soa.esb.addressing.helpers.EPRHelper;
 import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.internal.soa.esb.couriers.HttpCourier;
+import org.jboss.internal.soa.esb.couriers.TwoWayCourierImpl;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.Service;
 import org.jboss.soa.esb.addressing.EPR;
@@ -529,12 +532,22 @@
                     }
                     if (courier.deliver(message)) {
                         if (synchronous) {
-                            courier.cleanup() ;
+                            
                             // JBESB-1016 replyToEPR has to be non-null or we'd have dropped out by this point!
 
                             // do we need to do this for synchronous calls? Vagueries of Couriers?
 
+                           //It is a big hack for HTTP EPR. Look at if we can create another TwoWayCourierImpl to do that
+                            if (replyToEPR instanceof DefaultHttpReplyToEpr) {
+                                TwoWayCourierImpl twoWayCourier = (TwoWayCourierImpl)courier;
+                                HttpCourier httpCourier = (HttpCourier)twoWayCourier.getDeliverCourier();
+                                return httpCourier.pickup(timeout);
+                                
+                            }
+                            courier.cleanup() ;
+                            
                             courier.setReplyToEpr(replyToEPR);
+                            
                             return courier.pickup(timeout);
                         } else {
                             return message;

Modified: labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java
===================================================================
--- labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java	2008-12-24 00:54:33 UTC (rev 24496)
+++ labs/jbossesb/workspace/mlittle/legstar/product/rosetta/src/org/jboss/soa/esb/listeners/message/HttpMessageAwareListener.java	2008-12-26 05:22:32 UTC (rev 24497)
@@ -33,11 +33,14 @@
 import org.apache.log4j.Logger;
 import org.jboss.mx.util.MBeanServerLocator;
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.helpers.KeyValuePair;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
 import org.jboss.soa.esb.listeners.gateway.TomcatServer;
+import org.jboss.soa.esb.listeners.lifecycle.AbstractManagedLifecycle;
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
 
 /**
@@ -47,11 +50,8 @@
  *
  */
  
-public class HttpMessageAwareListener extends MessageAwareListener {
-	private static final long serialVersionUID = 1L;
+public class HttpMessageAwareListener extends AbstractManagedLifecycle {
 	
-	private static Logger logger = Logger.getLogger(HttpMessageAwareListener.class);
-	
 	/**JMX domain name for tomcat listener*/
 	public static String DOMAIN_NAME = "jboss.esb.tomcat";
 	
@@ -64,24 +64,31 @@
 	public String httpContext = null;
 	
 	public String dispatchServletClassName = "";
+	
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static Logger logger = Logger.getLogger(HttpMessageAwareListener.class);
+	
+	private EPR _epr;
 
 	public HttpMessageAwareListener(final ConfigTree config)
 			throws ConfigurationException {
 		super(config);
 		List<KeyValuePair> properties = getConfig().childPropertyList();
 		String maxThreads = getConfig().getAttribute(ListenerTagNames.MAX_THREADS_TAG);
-		if (maxThreads != null && getEpr() != null) {
-			HTTPEpr epr = (HTTPEpr)getEpr();
-			epr.setThreads(maxThreads);
-		}
-		ConfigTree eprElement = config.getFirstChild(ListenerTagNames.EPR_TAG);
+	    ConfigTree eprElement = config.getFirstChild(ListenerTagNames.EPR_TAG);
 		host = eprElement.getAttribute(ListenerTagNames.HOST_TAG, "localhost");
 		port = eprElement.getAttribute(ListenerTagNames.PORT_TAG);
 		httpContext = eprElement.getAttribute(ListenerTagNames.CONTEXT_TAG);
+		_epr = ListenerUtil.assembleEpr(eprElement);
 	}
 
+	protected void doInitialise() throws ManagedLifecycleException {
+		
+	}
+	
 	protected void doStart() throws ManagedLifecycleException {
-		super.doStart();
 		try {
 			this.startHttpServer();
 		} catch (Exception e) {
@@ -90,7 +97,6 @@
 		}		
 	}
 	protected void doStop() throws ManagedLifecycleException {
-		super.doStop();
 		try {
 			stopHttpServer();
 		} catch (Exception e) {
@@ -98,6 +104,9 @@
 		}
 	}
 	
+    protected void doDestroy() throws ManagedLifecycleException {
+		
+	}
 	
 	/**
 	 * Start the tomcat http server.It will check if it really needs to create
@@ -109,6 +118,7 @@
 	protected void startHttpServer() throws Exception {	
         TomcatServer.getInstance().createHost(host);
 		List<KeyValuePair> properties = getConfig().childPropertyList();
+		//TODO:add maxThreads option
 		TomcatServer.getInstance().createConnector(port, properties);
 		
 		ctx = new StandardContext();
@@ -116,9 +126,11 @@
 		ContextConfig ctxCfg = new ContextConfig();
 		ctx.addLifecycleListener(ctxCfg);
 		
+		
 		ctxCfg.setDefaultWebXml("org/apache/catalin/startup/NO_DEFAULT_XML");
 		ctx.setPath(httpContext);
 		ctx.setDocBase(".");
+		ctx.getServletContext().setAttribute("config", this.getConfig());
 		initWebappDefaults(ctx);
 		TomcatServer.getInstance().addContext(host, ctx);
 	}




More information about the jboss-svn-commits mailing list