[jboss-svn-commits] JBL Code SVN: r15291 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/message/filter and 8 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Sep 22 12:53:16 EDT 2007
Author: mark.little at jboss.com
Date: 2007-09-22 12:53:16 -0400 (Sat, 22 Sep 2007)
New Revision: 15291
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java
Modified:
labs/jbossesb/trunk/product/docs/AdministrationGuide.odt
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1009
Modified: labs/jbossesb/trunk/product/docs/AdministrationGuide.odt
===================================================================
(Binary files differ)
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/message/filter/TraceFilter.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.internal.soa.esb.message.filter;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.filter.InputOutputFilter;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * This input/output filter can be used for message tracing. It is enabled on a global
+ * basis and assumes that each message is uniquely identified by the MessageID in the
+ * header: otherwise tracing specific messages is impossible to guarantee.
+ *
+ * @author marklittle
+ */
+
+public class TraceFilter extends InputOutputFilter
+{
+ /*
+ * Message properties.
+ */
+
+ public static final String UNLOGGABLE_MESSAGE = "org.jboss.soa.esb.message.unloggable"; // yes or no message property
+
+ public Message onOutput (Message msg, Map<String, Object> params) throws CourierException
+ {
+ if (_enabled)
+ {
+ /*
+ * If marked as unloggable then ignore this message.
+ */
+
+ boolean ignore = false;
+
+ if (_permsg)
+ {
+ Object unloggable = msg.getProperties().getProperty(UNLOGGABLE_MESSAGE, "no");
+
+ if (unloggable instanceof String)
+ {
+ if (((String) unloggable).equalsIgnoreCase("yes"))
+ ignore = true;
+ }
+ }
+
+ if (!ignore)
+ {
+ _logger.info("TraceFilter.onOutput ( "+msg.getHeader()+" )");
+ }
+ }
+
+ return msg;
+ }
+
+ public Message onInput (Message msg, Map<String, Object> params) throws CourierException
+ {
+ if (_enabled)
+ {
+ /*
+ * If marked as unloggable then ignore this message.
+ */
+
+ boolean ignore = false;
+
+ if (_permsg)
+ {
+ Object unloggable = msg.getProperties().getProperty(UNLOGGABLE_MESSAGE, "no");
+
+ if (unloggable instanceof String)
+ {
+ if (((String) unloggable).equalsIgnoreCase("yes"))
+ ignore = true;
+ }
+ }
+
+ if (!ignore)
+ {
+ _logger.info("TraceFilter.onInput ( "+msg.getHeader()+" )");
+ }
+ }
+
+ return msg;
+ }
+
+ public TraceFilter ()
+ {
+ if (ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE).getProperty(Environment.MESSAGE_TRACE, "off").equalsIgnoreCase("on"))
+ {
+ _enabled = true;
+ }
+ else
+ _enabled = false;
+
+ if (ModulePropertyManager.getPropertyManager(ModulePropertyManager.FILTER_MODULE).getProperty(Environment.PER_MESSAGE_TRACE, "off").equalsIgnoreCase("on"))
+ {
+ _permsg = true;
+ }
+ else
+ _permsg = false;
+ }
+
+ private boolean _enabled;
+ private boolean _permsg;
+ private static final Logger _logger = Logger.getLogger(TraceFilter.class);
+}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -157,6 +157,9 @@
&& !service.equals(dlqService)) {
//Send a copy to the DLQ, no retries for syncDeliveries
message.getProperties().setProperty(DELIVER_TO, service);
+
+ logger.info("Delivering message ["+message.getHeader()+"] to DLQ.");
+
deliverToDeadLetterService(message);
}
throw mde;
@@ -185,6 +188,8 @@
message.getProperties().setProperty(MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_RDLVR);
message.getProperties().setProperty(DELIVER_TO, service);
try {
+ logger.info("Delivering message ["+message.getHeader()+"] to RDLVRQ.");
+
deliverToDeadLetterService(message);
} finally {
message.getProperties().remove(MessageStore.CLASSIFICATION);
@@ -216,6 +221,7 @@
if (dlQueueInvoker == null) {
dlQueueInvoker = new ServiceInvoker(dlqService);
}
+
dlQueueInvoker.deliverAsync(message);
}
}
@@ -252,14 +258,15 @@
// We've delivered it, we're done!
return replyMessage;
} else {
- logger.info("Unresponsive EPR: " + epr);
+ logger.info("Unresponsive EPR: " + epr+" for message: "+message.getHeader());
+
serviceClusterInfo.removeDeadEPR(epr);
}
}
}
// Throw exception if delivery failed...
- throw new MessageDeliverException("Failed to deliver message to Service [" + service + "]. Check for errors.");
+ throw new MessageDeliverException("Failed to deliver message ["+message.getHeader()+"] to Service [" + service + "]. Check for errors.");
}
/**
@@ -377,11 +384,11 @@
try {
courier = getCourier(epr);
} catch (CourierException e) {
- logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
+ logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", e);
} catch (MalformedEPRException e) {
- logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+ logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"]." + e.getMessage());
} catch (Throwable t) {
- logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
+ logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", t);
}
// Try delivering the message using the courier we just looked up....
@@ -401,7 +408,7 @@
replyToEPR = getReplyToAddress(epr);
if (replyToEPR == null) {
- logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
+ logger.debug("Not using epr [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"]. No reply-to address available for synchronous response.");
return null;
}
message.getHeader().getCall().setReplyTo(replyToEPR);
@@ -421,12 +428,12 @@
} catch (FaultMessageException e) {
throw e;
} catch (CourierException e) {
- logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+ logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"]. " + e.getMessage());
} catch (MalformedEPRException e) {
// Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
} catch (Throwable t) {
- logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
+ logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "] and Message ["+message.getHeader()+"].", t);
} finally {
// TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
CourierUtil.cleanCourier(courier);
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -169,4 +169,11 @@
*/
public static final String FILTER_NAME = "org.jboss.soa.esb.filter";
+
+ /*
+ * Some specific out-of-the-box filter configuration options.
+ */
+
+ public static final String MESSAGE_TRACE = "org.jboss.soa.esb.messagetrace"; // on or off
+ public static final String PER_MESSAGE_TRACE = "org.jboss.soa.esb.permessagetrace"; // on or off
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java 2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/filter/FilterManager.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -174,11 +174,11 @@
}
catch (ClassNotFoundException ex)
{
- _logger.warn("FilterManager problem loading class.", ex);
+ _logger.warn("FilterManager problem loading class "+filterName, ex);
}
catch (Throwable ex)
{
- _logger.warn("FilterManager problem during load.", ex);
+ _logger.warn("FilterManager problem during load "+filterName, ex);
}
}
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -240,12 +240,12 @@
final EPR faultToAddress = DefaultFaultTo.getFaultToAddress(message);
long start = System.nanoTime();
serviceMessageCounter.incrementTotalCount();
-
+
if (active.get())
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("pipeline process for message");
+ LOGGER.debug("pipeline process for message: "+message.getHeader());
}
final int numProcessors = processors.length;
@@ -261,7 +261,8 @@
try
{
- LOGGER.debug("executing processor " + count);
+ LOGGER.debug("executing processor " + count+ " "+processor+" "+message.getHeader());
+
currentMessage = processor.process(currentMessage);
if (currentMessage == null)
@@ -273,7 +274,7 @@
{
LOGGER
.warn(
- "Unexpected exception caught while processing the action pipeline",
+ "Unexpected exception caught while processing the action pipeline: "+message.getHeader(),
ex);
notifyException(count, ex, messages);
@@ -325,7 +326,7 @@
}
else
{
- LOGGER.debug("pipeline process disabled for message");
+ LOGGER.debug("pipeline process disabled for message: "+message.getHeader());
faultTo(faultToAddress, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
long procTime = System.nanoTime() - start;
@@ -355,7 +356,7 @@
if (replyToAddress == null)
{
- LOGGER.warn("No reply to address defined for reply message!");
+ LOGGER.warn("No reply to address defined for reply message! "+message.getHeader());
}
else
{
@@ -366,17 +367,17 @@
}
catch (final CourierException e)
{
- LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+ LOGGER.error("Failed to reply to address " + replyToAddress + " for message "+message.getHeader(),
e);
}
catch (final MalformedEPRException e)
{
- LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+ LOGGER.error("Failed to reply to address " + replyToAddress + " for message "+message.getHeader(),
e);
}
catch (final Throwable e)
{
- LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+ LOGGER.error("Failed to reply to address " + replyToAddress + " for message "+message.getHeader(),
e);
}
finally
@@ -408,7 +409,7 @@
if (faultToAddress == null)
{
- LOGGER.warn("No fault address defined for fault message!");
+ LOGGER.warn("No fault address defined for fault message! "+message.getHeader());
}
else
{
@@ -420,17 +421,17 @@
catch (final CourierException e)
{
LOGGER.error("Failed to send error to address " + faultToAddress
- + ".", e);
+ + " for message "+message.getHeader(), e);
}
catch (final MalformedEPRException e)
{
LOGGER.error("Failed to send error to address " + faultToAddress
- + ".", e);
+ + " for message "+message.getHeader(), e);
}
catch (final Throwable e)
{
LOGGER.error("Failed to send error to address " + faultToAddress
- + ".", e);
+ + " for message "+message.getHeader(), e);
}
finally
{
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java 2007-09-22 16:27:43 UTC (rev 15290)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/errors/Factory.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -84,7 +84,7 @@
return errorMessage;
}
else
- return null;
+ return input;
}
/**
Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/tests/filter/TraceFilterUnitTest.java 2007-09-22 16:53:16 UTC (rev 15291)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.tests.filter;
+
+import java.net.URI;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.addressing.eprs.FTPEpr;
+import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.filter.FilterManager;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.body.content.BytesBody;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
+public class TraceFilterUnitTest
+{
+ @BeforeClass
+ public static void setUp () throws Exception
+ {
+ PropertyManager pm = ModulePropertyManager
+ .getPropertyManager(ModulePropertyManager.FILTER_MODULE);
+ pm.setProperty("org.jboss.soa.esb.filter.0",
+ "org.jboss.internal.soa.esb.message.filter.TraceFilter");
+ pm.setProperty(Environment.MESSAGE_TRACE, "on");
+ }
+
+ public static junit.framework.Test suite ()
+ {
+ return new JUnit4TestAdapter(TraceFilterUnitTest.class);
+ }
+
+ @Test
+ public void testTracing () throws Exception
+ {
+ Message msg = MessageFactory.getInstance().getMessage();
+
+ msg.getHeader().getCall().setTo(new FTPEpr("ftp://foo.bar"));
+ msg.getHeader().getCall().setReplyTo(new HTTPEpr("http://bar.foo"));
+ msg.getHeader().getCall().setAction(new URI("urn:dowork"));
+ msg.getBody().add(BytesBody.BYTES_LOCATION, "Hello World".getBytes());
+ msg.getHeader().getCall().setMessageID(new URI("urn:foo/bar/1234"));
+
+ Message output = FilterManager.getInstance().doOutputWork(msg, null);
+
+ Assert.assertNotNull(output);
+
+ Message input = FilterManager.getInstance().doInputWork(msg, null);
+
+ Assert.assertNotNull(input);
+ }
+}
More information about the jboss-svn-commits
mailing list