[jboss-svn-commits] JBL Code SVN: r14636 - labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Aug 27 08:31:17 EDT 2007


Author: tcunning
Date: 2007-08-27 08:31:17 -0400 (Mon, 27 Aug 2007)
New Revision: 14636

Added:
   labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksMessageCounter.java
Modified:
   labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksTransformer.java
Log:
bug:JBESB-669
Adding a Smooks message counter.


Added: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksMessageCounter.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksMessageCounter.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksMessageCounter.java	2007-08-27 12:31:17 UTC (rev 14636)
@@ -0,0 +1,387 @@
+package org.jboss.soa.esb.actions.converters;
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+import java.util.HashMap;
+import java.util.List;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.log4j.Logger;
+import org.jboss.mx.util.MBeanServerLocator;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.message.ActionStatusBean;
+
+/**
+ * Smooks message counter is a MBean that displays shows
+ * how many failed and successful messages have been processed and shows the processing time
+ * of each.
+ * 
+ * @author <a href="mailto:tcunning at redhat.com">tcunning at redhat.com</a>
+ * @since Version 4.2
+ */
+public class SmooksMessageCounter implements DynamicMBean {
+	private static final Logger logger = Logger.getLogger(SmooksMessageCounter.class);
+	public static final String TRANSFORM_SUCCEED = "SUCCEED";
+	public static final String TRANSFORM_FAILED = "FAILED";
+	
+	private HashMap<String, Integer> actionCounterHash;
+	private HashMap<String, Integer> actionFailedCounterHash;
+	private HashMap<String, Long> actionProcessTimeHash;
+	private ConfigTree m_config;
+	private ConfigTree[] actionArray;
+	private Integer serviceCount;
+	
+	public static final String RESET_COUNTER = "resetCounter";
+	private static final String MESSAGE_COUNTER = "messages successfully processed count";
+	private static final String FAILED_MESSAGE_COUNTER = "messages failed count";
+	private static final String PROCESSING_TIME = "processing time";
+	private static final String OVERALL_SERVICE_COUNT = "overall message transform count";
+	
+	/**
+	 * Constructor
+	 * @param f_config config tree
+	 */
+	public SmooksMessageCounter(ConfigTree f_config) {
+		
+		actionCounterHash = new HashMap<String, Integer>();
+		actionFailedCounterHash = new HashMap<String, Integer>();
+		actionProcessTimeHash = new HashMap<String, Long>();
+		serviceCount = new Integer(0);
+		
+		m_config = f_config;
+		actionArray = m_config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
+
+		initHashes();
+	}
+	
+	/**
+	 * Return an action id.   If there's an action name, use that, if not,
+	 * use the "action" attribute on the action.
+	 * @param ct config tree
+	 * @return action id
+	 */
+	public String getActionId(ConfigTree ct) {
+		if (ct.getAttribute("name") != null) {
+			return ct.getAttribute("name");
+		} else if (ct.getAttribute("action") != null) {
+			return ct.getAttribute("action");
+		}
+		return null;
+	}
+	
+	/**
+	 * Increment the total message count of this service.
+	 */
+	public void incrementTotalCount() {
+		serviceCount = new Integer(serviceCount.intValue() + 1);
+	}
+	
+	/**
+	 * Initialize the hashes by setting the counts and the processing time to an initial value of 0. 
+	 */
+	public void initHashes() {
+		ConfigTree[] actionList = m_config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
+		
+		for (ConfigTree actionConfig : actionList) {
+			String actionId = getActionId(actionConfig);
+		 	actionCounterHash.put(actionId + " " + MESSAGE_COUNTER, new Integer(0));
+			actionFailedCounterHash.put(actionId + " " + FAILED_MESSAGE_COUNTER, new Integer(0));
+			actionProcessTimeHash.put(actionId + " " + PROCESSING_TIME, new Long(0));
+		}		
+		serviceCount = new Integer(0);
+	}
+	
+	/**
+	 * Reset the counters - set all the entries in the action counter hash 
+	 * and in the action process time hash to zero.
+	 */
+	public void resetCounter() {
+		serviceCount = new Integer(0);
+		
+		for (String key : actionCounterHash.keySet()) {
+			actionCounterHash.put(key + " " + MESSAGE_COUNTER, new Integer(0));
+		}
+		
+		for (String key : actionFailedCounterHash.keySet()) {
+			actionFailedCounterHash.put(key + " " + FAILED_MESSAGE_COUNTER, new Integer(0));
+		}
+		
+		for (String key : actionProcessTimeHash.keySet()) {
+			actionProcessTimeHash.put(key + " " + PROCESSING_TIME, new Long(0));
+		}
+	}
+	
+	/**
+	 *  This creates the MBeanInfo object provided.     We are returning generic 
+	 *  text for the attribute descriptions (the word Property and the name of the 
+	 *  attribute), all of the attributes are read-only, and we provide four 
+	 *  invocation methods - start/stop/initialise/destroy on the Lifecycle. 
+	 */
+    public MBeanInfo getMBeanInfo() {
+		
+		int count = actionCounterHash.size() + actionProcessTimeHash.size()
+			+ actionFailedCounterHash.size() + 1;
+        MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[count];
+        int counter = 0;
+              
+		for (String key : actionCounterHash.keySet()) {
+            attrs[counter] = new MBeanAttributeInfo(
+                    key, "java.lang.Integer", "Property " + key, true, false, false);
+            counter++;
+		}
+		
+		for (String key : actionProcessTimeHash.keySet()) {
+            attrs[counter] = new MBeanAttributeInfo(
+                    key, "java.lang.Double", "Property " + key, true, false, false);
+            counter++;
+		}
+		
+		for (String key : actionFailedCounterHash.keySet()) {
+            attrs[counter] = new MBeanAttributeInfo(
+                    key, "java.lang.Integer", "Property " + key, true, false, false);
+            counter++;
+		}
+		
+		if (counter > 0) {
+			MBeanAttributeInfo overallCount = new MBeanAttributeInfo(OVERALL_SERVICE_COUNT, "java.lang.Integer",
+					"Property " + OVERALL_SERVICE_COUNT, true, false, false);
+			attrs[counter] = overallCount;
+			counter++;
+		}
+			
+        MBeanOperationInfo[] opers = {
+        	new MBeanOperationInfo(
+        			RESET_COUNTER, "Reset the counter",
+                	null, "void", MBeanOperationInfo.ACTION)
+        };
+        return new MBeanInfo(
+                this.getClass().getName(), "Service Message Counter MBean",
+                attrs, null, opers, null); // notifications
+	}
+
+	/* (non-Javadoc)
+	 * @see javax.management.DynamicMBean#getAttribute(java.lang.String)
+	 */
+	public Object getAttribute(String key) throws AttributeNotFoundException, MBeanException, ReflectionException {
+    	if (actionCounterHash.containsKey(key)) {
+    		Integer value = null;
+    		value = actionCounterHash.get(key);
+    		return value;
+    	} else if (actionProcessTimeHash.containsKey(key)) {
+    		Long processTotal = actionProcessTimeHash.get(key);
+    		String actionId = key.substring(0, key.indexOf(PROCESSING_TIME)-1);
+    		Integer successCount = actionCounterHash.get(actionId + " " + MESSAGE_COUNTER);
+    		Double value = null;
+    		
+    		if (successCount.intValue() > 0) {
+    			value = ((double) processTotal / successCount.intValue());
+    		} else {
+    			value = null;
+    		}
+    		return value;
+    	} else if (actionFailedCounterHash.containsKey(key)) {
+    		Integer value = actionFailedCounterHash.get(key);
+    		return value;
+    	} else if (OVERALL_SERVICE_COUNT.equals(key)) {
+    		return serviceCount;
+    	}
+    	return null;
+    }
+
+	/* (non-Javadoc)
+	 * @see javax.management.DynamicMBean#getAttributes(java.lang.String[])
+	 */
+	public AttributeList getAttributes(String[] arg0) {
+		AttributeList attributeList = new AttributeList();
+		for (String key : actionCounterHash.keySet()) {
+			Attribute at = new Attribute(key, actionCounterHash.get(key).toString());
+			attributeList.add(at);
+		}
+		
+		Attribute overallCount = new Attribute(OVERALL_SERVICE_COUNT, serviceCount);
+		attributeList.add(overallCount);
+		
+		for (String key : actionProcessTimeHash.keySet()) {
+			Long processTotal = actionProcessTimeHash.get(key);
+			String actionId = key.substring(0, key.indexOf(PROCESSING_TIME));
+			Integer successCount = actionCounterHash.get(actionId + " " + MESSAGE_COUNTER);
+			String avgTime = null;
+			if (successCount.intValue() > 0) {
+				avgTime = ((double) processTotal / successCount.intValue()) + " ns";
+			}
+			Attribute at = new Attribute(key, avgTime);
+			attributeList.add(at);
+		}
+		
+		for (String key : actionFailedCounterHash.keySet()) {
+			Attribute at = new Attribute(key, actionFailedCounterHash.get(key).toString());
+			attributeList.add(at);
+		}
+
+		return attributeList;
+	}
+
+	/* (non-Javadoc)
+	 * @see javax.management.DynamicMBean#invoke(java.lang.String, java.lang.Object[], java.lang.String[])
+	 */
+	public Object invoke(String method, Object[] arg1, String[] arg2) throws MBeanException, ReflectionException {
+		if (method.equalsIgnoreCase(RESET_COUNTER)) {
+				resetCounter();
+			return "Invoking the " + method + " on the lifecycle.";
+		} else {
+			throw new ReflectionException(new NoSuchMethodException(method));
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see javax.management.DynamicMBean#setAttribute(javax.management.Attribute)
+	 */
+	public void setAttribute(Attribute arg0) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
+	}
+
+	/* (non-Javadoc)
+	 * @see javax.management.DynamicMBean#setAttributes(javax.management.AttributeList)
+	 */
+	public AttributeList setAttributes(AttributeList arg0) {
+		return null;
+	}
+
+	/**
+	 * Register this MBean with JBoss.
+	 */
+	protected void registerMBean() {
+		MBeanServer mbeanServer = null;
+		try {
+			mbeanServer = MBeanServerLocator.locateJBoss();
+		} catch (IllegalStateException ise) {
+			// If we can't find a JBoss MBeanServer, just return
+			// Needed for unit tests
+			return;
+		}
+		
+		ObjectName listObjectName = getObjectName();
+		
+		if (mbeanServer.isRegistered(listObjectName)) {
+	    	try {
+	    		mbeanServer.unregisterMBean(listObjectName);
+			} catch (InstanceNotFoundException e) {
+				e.printStackTrace();
+			} catch (MBeanRegistrationException e) {
+				e.printStackTrace();
+			}
+        }
+		
+	    try {
+	    	mbeanServer.registerMBean(this, listObjectName);
+		} catch (InstanceAlreadyExistsException e) {
+			e.printStackTrace();
+		} catch (MBeanRegistrationException e) {
+			e.printStackTrace();
+		} catch (NotCompliantMBeanException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public void initMessageProfile(String messageProfile) {
+		if (!actionProcessTimeHash.containsKey(messageProfile + " " + PROCESSING_TIME)) {
+			actionProcessTimeHash.put(messageProfile + " " + PROCESSING_TIME, new Long(0));
+		}
+		if (!actionCounterHash.containsKey(messageProfile + " " + MESSAGE_COUNTER)) {
+			actionCounterHash.put(messageProfile + " " + MESSAGE_COUNTER, new Integer(0));
+		}
+		if (!actionFailedCounterHash.containsKey(messageProfile + " " + FAILED_MESSAGE_COUNTER)) {
+			actionCounterHash.put(messageProfile + " " + FAILED_MESSAGE_COUNTER, new Integer(0));
+		}
+	}
+	
+	/**
+	 * Update the ServiceMessageCounter
+	 * @param asb ActionStatusBean
+	 */
+	public void update(long procTime, String messageProfile, String status) {		
+		if ((messageProfile != null) && (!messageProfile.equals(""))) {
+			initMessageProfile(messageProfile);
+		}
+		
+		if (TRANSFORM_SUCCEED.equals(status)) {
+			Integer count = actionCounterHash.get(messageProfile + " " + MESSAGE_COUNTER);
+			count = count.intValue() + 1;
+			actionCounterHash.put(messageProfile + " " + MESSAGE_COUNTER, count);
+			Long time = actionProcessTimeHash.get(messageProfile + " " + PROCESSING_TIME);
+			time = time.longValue() + procTime;
+			actionProcessTimeHash.put(messageProfile + " " + PROCESSING_TIME, time);
+		} else if (TRANSFORM_FAILED.equals(status)) {
+			Integer count = actionFailedCounterHash.get(messageProfile + " " + FAILED_MESSAGE_COUNTER);
+			count = count.intValue() + 1;
+			actionFailedCounterHash.put(messageProfile + " " + FAILED_MESSAGE_COUNTER, count);
+		}		
+	}
+	
+	protected ObjectName getObjectName()
+	{
+		String deploymentName = "";
+		List<KeyValuePair> properties = m_config.attributesAsList();
+		
+		// Get Deployment Name
+		try {
+			deploymentName = m_config.getParent().getParent().getAttribute("deployment");
+		} catch (Exception e) {
+			logger.error("", e);
+		}
+		
+		ObjectName listObjectName = null;
+		try {
+
+			StringBuffer objectName = new StringBuffer("category=SmooksMessageCounter");
+			if (deploymentName != null) {
+				objectName.append(",").append(ListenerTagNames.DEPLOYMENT_NAME_TAG).append("=").append(deploymentName);
+			}
+			
+			listObjectName = new ObjectName("jboss.esb:" + objectName.toString());
+		} catch (MalformedObjectNameException e1) {
+			e1.printStackTrace();
+		} catch (NullPointerException e1) {
+			e1.printStackTrace();
+		}
+		return listObjectName;
+	}
+}

Modified: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksTransformer.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksTransformer.java	2007-08-27 12:01:16 UTC (rev 14635)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/actions/converters/SmooksTransformer.java	2007-08-27 12:31:17 UTC (rev 14636)
@@ -31,6 +31,8 @@
 import org.jboss.soa.esb.actions.ActionUtils;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.listeners.message.ActionStatusBean;
+import org.jboss.soa.esb.listeners.message.ServiceMessageCounter;
 import org.jboss.soa.esb.message.Body;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.transform.TransformationException;
@@ -164,6 +166,8 @@
     private String defaultMessageToType;
     private String defaultMessageTo;
     private boolean setPayloadToBeanHash = false;
+    
+    private SmooksMessageCounter smooksMessageCounter;
 
     /**
      * Public constructor.
@@ -178,7 +182,7 @@
         inputLocation = KeyValuePair.getValue(INPUT_LOCATION, properties, Body.DEFAULT_LOCATION);
         // if no output location given, then assume default location in message body.
         outputLocation = KeyValuePair.getValue(OUTPUT_LOCATION, properties, Body.DEFAULT_LOCATION);
-
+        
         // Get the default message flow properties (can be overriden by the message properties)...
 		defaultMessageFromType = KeyValuePair.getValue(FROM_TYPE, properties);
 		if(defaultMessageFromType != null && defaultMessageFromType.trim().equals("")) {
@@ -196,7 +200,9 @@
 		if(defaultMessageTo != null && defaultMessageTo.trim().equals("")) {
 			throw new ConfigurationException("Empty '" + TO + "' config attribute supplied.");
 		}
-
+		smooksMessageCounter = new SmooksMessageCounter(propertiesTree);
+		smooksMessageCounter.registerMBean();
+		
         // Get the set-beanhash-to-payload property...
 		setPayloadToBeanHash = KeyValuePair.getBooleanValue(SET_BEANHASH_TO_PAYLOAD, properties, false);
 
@@ -223,7 +229,7 @@
         if(smooksInstanceManager == null) {
             throw new ActionLifecycleException("Invalid " + getClass().getSimpleName() + " action configuration.  No 'resource-config' specified on the action and no centralised console config available (see smooks.esb.properties).");
         }
-
+        
         logger.info("Smooks configurations are now loaded.");
     }
 
@@ -270,11 +276,15 @@
      * @see org.jboss.soa.esb.actions.ActionProcessor#process(java.lang.Object)
      */
     public Message process(Message message) throws ActionProcessingException {
-        if(smooksInstanceManager == null) {
+        String messageProfile = "";
+
+    	if(smooksInstanceManager == null) {
             logger.warn("SmooksTransformer instance is decommisioned.  Probably due to a bad action configuration. Not performing transformation on this message.");
             return message;
         }
-
+        
+        long startTime = System.nanoTime();
+        
         Object payload = message.getBody().get(inputLocation);
 
         if(payload == null) {
@@ -297,7 +307,6 @@
 	            long start = System.currentTimeMillis();
                 Smooks smooks = smooksInstanceManager.getSmooksInstance();
                 StandaloneExecutionContext executionContext;
-                String messageProfile;
 
                 // Register the message profile with Smooks (if there is one and it's not already registered)...
                 messageProfile = registerMessageProfile(message, smooks);
@@ -336,9 +345,18 @@
             } else {
 	            logger.warn("Only java.lang.String payload types supported.  Input message was of type [" + payload.getClass().getName() + "].  Returning message untransformed.");
 	        }
-    	} catch(Throwable thrown) {
+            
+            long procTime = System.nanoTime() - startTime;
+			smooksMessageCounter.update(procTime, messageProfile, SmooksMessageCounter.TRANSFORM_SUCCEED);
+
+        } catch(Throwable thrown) {
+    		long procTime = System.nanoTime() - startTime;
+			smooksMessageCounter.update(procTime, messageProfile, SmooksMessageCounter.TRANSFORM_FAILED);
+
     		thrown.printStackTrace();
     		throw new ActionProcessingException("Message transformation failed.", thrown);
+    	} finally {
+    		smooksMessageCounter.incrementTotalCount();
     	}
         
         // TODO: Cater for more message input types e.g. InputStream, DOM Document...




More information about the jboss-svn-commits mailing list