[jboss-svn-commits] JBL Code SVN: r15291 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/message/filter and 8 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Sep 22 12:53:16 EDT 2007


Author: mark.little at jboss.com
Date: 2007-09-22 12:53:16 -0400 (Sat, 22 Sep 2007)
New Revision: 15291

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java
Modified:
   labs/jbossesb/trunk/product/docs/AdministrationGuide.odt
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1009

Modified: labs/jbossesb/trunk/product/docs/AdministrationGuide.odt
===================================================================
(Binary files differ)

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.internal.soa.esb.message.filter;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.filter.InputOutputFilter;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * This input/output filter can be used for message tracing. It is enabled on a global
+ * basis and assumes that each message is uniquely identified by the MessageID in the
+ * header: otherwise tracing specific messages is impossible to guarantee.
+ * 
+ * @author marklittle
+ */
+
+public class TraceFilter extends InputOutputFilter
+{
+    /*
+     * Message properties.
+     */
+    
+    public static final String UNLOGGABLE_MESSAGE = "org.jboss.soa.esb.message.unloggable"; // yes or no message property
+	
+    public Message onOutput (Message msg, Map<String, Object> params) throws CourierException
+    {
+	if (_enabled)
+	{
+	    /*
+	     * If marked as unloggable then ignore this message.
+	     */
+
+	    boolean ignore = false;
+	    
+	    if (_permsg)
+	    {
+		Object unloggable = msg.getProperties().getProperty(UNLOGGABLE_MESSAGE, "no");
+		
+        	if (unloggable instanceof String)
+        	{
+        	    if (((String) unloggable).equalsIgnoreCase("yes"))
+        		ignore = true;
+        	}
+	    }
+	    
+	    if (!ignore)
+	    {
+		_logger.info("TraceFilter.onOutput ( "+msg.getHeader()+" )");
+	    }
+	}
+	
+	return msg;
+    }
+
+    public Message onInput (Message msg, Map<String, Object> params) throws CourierException
+    {
+	if (_enabled)
+	{
+	    /*
+	     * If marked as unloggable then ignore this message.
+	     */
+
+	    boolean ignore = false;
+	    
+	    if (_permsg)
+	    {
+		Object unloggable = msg.getProperties().getProperty(UNLOGGABLE_MESSAGE, "no");
+		
+        	if (unloggable instanceof String)
+        	{
+        	    if (((String) unloggable).equalsIgnoreCase("yes"))
+        		ignore = true;
+        	}
+	    }
+	    
+	    if (!ignore)
+	    {
+		_logger.info("TraceFilter.onInput ( "+msg.getHeader()+" )");
+	    }
+	}
+	
+	return msg;
+    }
+
+    public TraceFilter ()
+    {
+	if (ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE).getProperty(Environment.MESSAGE_TRACE, "off").equalsIgnoreCase("on"))
+	{
+	    _enabled = true;
+	}
+	else
+	    _enabled = false;
+	
+	if (ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE).getProperty(Environment.PER_MESSAGE_TRACE, "off").equalsIgnoreCase("on"))
+	{
+	    _permsg = true;
+	}
+	else
+	    _permsg = false;
+    }
+    
+    private boolean _enabled;
+    private boolean _permsg;
+    private static final Logger _logger = Logger.getLogger(TraceFilter.class);
+}

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -157,6 +157,9 @@
                     && !service.equals(dlqService)) {
                 //Send a copy to the DLQ, no retries for syncDeliveries
                 message.getProperties().setProperty(DELIVER_TO, service);
+                
+                logger.info("Delivering message ["+message.getHeader()+"] to DLQ.");
+                
                 deliverToDeadLetterService(message);
             }
             throw mde;
@@ -185,6 +188,8 @@
                 message.getProperties().setProperty(MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_RDLVR);
                 message.getProperties().setProperty(DELIVER_TO, service);
                 try {
+                    logger.info("Delivering message ["+message.getHeader()+"] to RDLVRQ.");
+                    
                     deliverToDeadLetterService(message);
                 } finally {
                     message.getProperties().remove(MessageStore.CLASSIFICATION);
@@ -216,6 +221,7 @@
             if (dlQueueInvoker == null) {
                 dlQueueInvoker = new ServiceInvoker(dlqService);
             }
+             
             dlQueueInvoker.deliverAsync(message);
         }
     }
@@ -252,14 +258,15 @@
                     // We've delivered it, we're done!
                     return replyMessage;
                 } else {
-                    logger.info("Unresponsive EPR: " + epr);
+                    logger.info("Unresponsive EPR: " + epr+" for message: "+message.getHeader());
+                    
                     serviceClusterInfo.removeDeadEPR(epr);
                 }
             }
         }
 
         // Throw exception if delivery failed...
-        throw new MessageDeliverException("Failed to deliver message to Service [" + service + "].  Check for errors.");
+        throw new MessageDeliverException("Failed to deliver message ["+message.getHeader()+"] to Service [" + service + "].  Check for errors.");
     }
 
     /**
@@ -377,11 +384,11 @@
             try {
                 courier = getCourier(epr);
             } catch (CourierException e) {
-                logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
+                logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", e);
             } catch (MalformedEPRException e) {
-                logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+                logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"]." + e.getMessage());
             } catch (Throwable t) {
-                logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
+                logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", t);
             }
 
             // Try delivering the message using the courier we just looked up....
@@ -401,7 +408,7 @@
                             replyToEPR = getReplyToAddress(epr);
 
                         if (replyToEPR == null) {
-                            logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
+                            logger.debug("Not using epr [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"]. No reply-to address available for synchronous response.");
                             return null;
                         }
                         message.getHeader().getCall().setReplyTo(replyToEPR);
@@ -421,12 +428,12 @@
                 } catch (FaultMessageException e) {
                     throw e;
                 } catch (CourierException e) {
-                    logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+                    logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"]. " + e.getMessage());
                 } catch (MalformedEPRException e) {
                     // Hmmmm???... Can this really happen?  The Courier has already been created.  Haven't we already validated the EPR during the Courier lookup (above)??
                     logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
                 } catch (Throwable t) {
-                    logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
+                    logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", t);
                 } finally {
                     // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
                     CourierUtil.cleanCourier(courier);

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -169,4 +169,11 @@
 	 */
 
 	public static final String FILTER_NAME = "org.jboss.soa.esb.filter";
+	
+	/*
+	 * Some specific out-of-the-box filter configuration options.
+	 */
+	
+	public static final String MESSAGE_TRACE = "org.jboss.soa.esb.messagetrace"; // on or off
+	public static final String PER_MESSAGE_TRACE = "org.jboss.soa.esb.permessagetrace"; // on or off
 }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java	2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -174,11 +174,11 @@
 		    }
 		    catch (ClassNotFoundException ex)
 		    {
-			_logger.warn("FilterManager problem loading class.", ex);
+			_logger.warn("FilterManager problem loading class "+filterName, ex);
 		    }
 		    catch (Throwable ex)
 		    {
-			_logger.warn("FilterManager problem during load.", ex);
+			_logger.warn("FilterManager problem during load "+filterName, ex);
 		    }
 		}
 	    }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -240,12 +240,12 @@
 		final EPR faultToAddress = DefaultFaultTo.getFaultToAddress(message);
 		long start = System.nanoTime();
 		serviceMessageCounter.incrementTotalCount();
-		
+
 		if (active.get())
 		{
 			if (LOGGER.isDebugEnabled())
 			{
-				LOGGER.debug("pipeline process for message");
+				LOGGER.debug("pipeline process for message: "+message.getHeader());
 			}
 
 			final int numProcessors = processors.length;
@@ -261,7 +261,8 @@
 
 				try
 				{
-					LOGGER.debug("executing processor " + count);
+					LOGGER.debug("executing processor " + count+ " "+processor+" "+message.getHeader());
+					
 					currentMessage = processor.process(currentMessage);
 
 					if (currentMessage == null)
@@ -273,7 +274,7 @@
 				{
 					LOGGER
 							.warn(
-									"Unexpected exception caught while processing the action pipeline",
+									"Unexpected exception caught while processing the action pipeline: "+message.getHeader(),
 									ex);
 
 					notifyException(count, ex, messages);
@@ -325,7 +326,7 @@
 		}
 		else
 		{
-			LOGGER.debug("pipeline process disabled for message");
+			LOGGER.debug("pipeline process disabled for message: "+message.getHeader());
 
 			faultTo(faultToAddress, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
 			long procTime = System.nanoTime() - start;
@@ -355,7 +356,7 @@
 
 		if (replyToAddress == null)
 		{
-			LOGGER.warn("No reply to address defined for reply message!");
+			LOGGER.warn("No reply to address defined for reply message! "+message.getHeader());
 		}
 		else
 		{
@@ -366,17 +367,17 @@
 			}
 			catch (final CourierException e)
 			{
-				LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+				LOGGER.error("Failed to reply to address " + replyToAddress + " for message "+message.getHeader(),
 						e);
 			}
 			catch (final MalformedEPRException e)
 			{
-				LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+				LOGGER.error("Failed to reply to address " + replyToAddress + " for message "+message.getHeader(),
 						e);
 			}
 			catch (final Throwable e)
 			{
-				LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+				LOGGER.error("Failed to reply to address " + replyToAddress + " for message "+message.getHeader(),
 						e);
 			}
 			finally
@@ -408,7 +409,7 @@
 
 		if (faultToAddress == null)
 		{
-			LOGGER.warn("No fault address defined for fault message!");
+			LOGGER.warn("No fault address defined for fault message! "+message.getHeader());
 		}
 		else
 		{
@@ -420,17 +421,17 @@
 			catch (final CourierException e)
 			{
 				LOGGER.error("Failed to send error to address " + faultToAddress
-						+ ".", e);
+						+ " for message "+message.getHeader(), e);
 			}
 			catch (final MalformedEPRException e)
 			{
 				LOGGER.error("Failed to send error to address " + faultToAddress
-						+ ".", e);
+						+ " for message "+message.getHeader(), e);
 			}
 			catch (final Throwable e)
 			{
 				LOGGER.error("Failed to send error to address " + faultToAddress
-						+ ".", e);
+						+ " for message "+message.getHeader(), e);
 			}
 			finally
 			{

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java	2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -84,7 +84,7 @@
 			return errorMessage;
 		}
 		else
-			return null;
+			return input;
 	}
 	
 	/**

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java	2007-09-22 16:53:16 UTC (rev 15291)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.tests.filter;
+
+import java.net.URI;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.addressing.eprs.FTPEpr;
+import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.filter.FilterManager;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.body.content.BytesBody;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+public class TraceFilterUnitTest
+{
+    @BeforeClass
+    public static void setUp () throws Exception
+    {
+	PropertyManager pm = ModulePropertyManager
+		.getPropertyManager(ModulePropertyManager.FILTER_MODULE);
+	pm.setProperty("org.jboss.soa.esb.filter.0",
+		"org.jboss.internal.soa.esb.message.filter.TraceFilter");
+	pm.setProperty(Environment.MESSAGE_TRACE, "on");
+    }
+
+    public static junit.framework.Test suite ()
+    {
+	return new JUnit4TestAdapter(TraceFilterUnitTest.class);
+    }
+
+    @Test
+    public void testTracing () throws Exception
+    {
+	Message msg = MessageFactory.getInstance().getMessage();
+	
+	msg.getHeader().getCall().setTo(new FTPEpr("ftp://foo.bar"));
+	msg.getHeader().getCall().setReplyTo(new HTTPEpr("http://bar.foo"));
+	msg.getHeader().getCall().setAction(new URI("urn:dowork"));
+	msg.getBody().add(BytesBody.BYTES_LOCATION, "Hello World".getBytes());
+	msg.getHeader().getCall().setMessageID(new URI("urn:foo/bar/1234"));
+
+	Message output = FilterManager.getInstance().doOutputWork(msg, null);
+
+	Assert.assertNotNull(output);
+	
+	Message input = FilterManager.getInstance().doInputWork(msg, null);
+
+	Assert.assertNotNull(input);
+    }
+}




More information about the jboss-svn-commits mailing list