[jboss-svn-commits] JBL Code SVN: r20926 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb: listeners/message and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun Jul 6 00:33:10 EDT 2008


Author: tcunning
Date: 2008-07-06 00:33:09 -0400 (Sun, 06 Jul 2008)
New Revision: 20926

Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java
Log:
bug:JBESB-1654
Add code to track byte size of message at serialization time for monitoring.


Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -161,6 +161,7 @@
 	public static final String MESSAGE_SOURCE = "org.jboss.soa.esb.message.source";
 	public static final String MESSAGE_ENTRY_TIME = "org.jboss.soa.esb.message.time.dob"; // time born
 	public static final String MESSAGE_EXIT_TIME = "org.jboss.soa.esb.message.time.dod";  // time died
+	public static final String MESSAGE_BYTE_SIZE = "org.jboss.soa.esb.message.byte.size"; // size
 	
 	/** Message property name for original filename */
 	

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -353,7 +353,7 @@
 					}
 
 					long procTime = System.nanoTime() - start;
-					serviceMessageCounter.update(new ActionStatusBean(procTime, count,
+					serviceMessageCounter.update(new ActionStatusBean(procTime, count, message,
 							ActionStatusBean.ACTION_FAILED));
 		        	DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message, 
 		        			MessageStatusBean.MESSAGE_FAILED));
@@ -364,7 +364,7 @@
 		        	        }
 					return false;
 				}
-				serviceMessageCounter.update(new ActionStatusBean((System.nanoTime() - start), count,
+				serviceMessageCounter.update(new ActionStatusBean((System.nanoTime() - start), count, message,
 						ActionStatusBean.ACTION_SENT));
 			}
 

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -20,8 +20,11 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-package org.jboss.soa.esb.listeners.message;
+package org.jboss.soa.esb.listeners.message; 
 
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.common.Environment;
+
 /**
  * ActionStatusBean is a bean which wraps the information necessary to update the
  * ServiceMessageCounter MBean.
@@ -33,7 +36,9 @@
 	private int m_procCount;
 	private String m_status;
 	private long m_procTime;
+	private int m_bytesProcessed;
 	
+	
 	public static final String ACTION_SENT = "SENT";
 	public static final String ACTION_FAILED = "FAILED";
 	
@@ -43,12 +48,22 @@
 	 * @param f_procCount counter that identifies the action processor in the ConfigTree
 	 * @param f_status status string
 	 */
-	public ActionStatusBean(long procTime, int f_procCount, String f_status) {
+	public ActionStatusBean(long procTime, int f_procCount, Message f_message, String f_status) {
 		m_procTime = procTime;
 		m_procCount = f_procCount;
 		m_status = f_status;
+		m_bytesProcessed = 0;
+		try {
+			String bytes = (String) f_message.getProperties().getProperty(Environment.MESSAGE_BYTE_SIZE);
+			m_bytesProcessed = Integer.parseInt(bytes);
+		} catch (NullPointerException npe) {			
+		}
 	}
 	
+	public int getBytesProcessed() {
+		return m_bytesProcessed;
+	}
+	
 	/**
 	 * Process counter getter.
 	 * @return counter that identifies the action processor in the ConfigTree

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -40,6 +40,8 @@
 	private int failedMessageCount;
 	private int successMessageCount;
 	private long totalProcessTime;
+	private int bytesProcessed;
+	private int bytesFailed;
 	
 	/**
 	 * Constructor.
@@ -64,7 +66,7 @@
 	public String getLastSuccessfulMessageDate() {
 		return lastSuccessfulMessageDate;
 	}
-
+	
 	/**
 	 * Get the time the last message was processed at.
 	 * @return time the last message was processed at
@@ -89,6 +91,14 @@
 		return successMessageCount;
 	}
 
+	public int getBytesProcessed() {
+		return bytesProcessed;
+	}
+	
+	public int getBytesFailed() {
+		return bytesFailed;
+	}
+	
 	/**
 	 * Update the message counter based on the MessageStatusBean that is returned
 	 * from the observable.
@@ -102,10 +112,12 @@
 			successMessageCount++;
 			Timestamp ts = new Timestamp(msb.getMessageTime());
 			lastSuccessfulMessageDate = ts.toString();
+			bytesProcessed += msb.getMessageBytes();
 		} else if (msb.getMessageStatus().equals(MessageStatusBean.MESSAGE_FAILED)) {
 			failedMessageCount++;
 			Timestamp ts = new Timestamp(msb.getMessageTime());
 			lastFailedMessageDate = ts.toString();
+			bytesFailed += msb.getMessageBytes();
 		}
 		
 		totalProcessTime += msb.getProcessTime();
@@ -133,5 +145,7 @@
 		successMessageCount = 0;
 		failedMessageCount = 0;
 		totalProcessTime = 0;
+		bytesProcessed = 0;
+		bytesFailed = 0;
 	}
 }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -43,4 +43,8 @@
 	public Double getAverageSuccessTime();
 		
 	public void resetCounts();
+	
+	public int getBytesProcessed();
+	
+	public int getBytesFailed();
 }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -20,6 +20,7 @@
 package org.jboss.soa.esb.listeners.message;
 
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.common.Environment;
 
 /**
  * Bean that stores data on the message status.    Since the Observer/Observable
@@ -89,6 +90,22 @@
 	public void setMessageStatus(String f_status) {
 		this.m_status = f_status;
 	}
+
+	/**
+	 * Message number of bytes getter
+	 * @return message bytes
+	 */
+	public int getMessageBytes() {
+		int messageBytes = 0;
+		try {
+			String byteSizeString = (String) this.m_msg.getProperties().getProperty(Environment.MESSAGE_BYTE_SIZE);
+			if (byteSizeString != null) {
+				messageBytes = Integer.parseInt(byteSizeString);
+			}
+		} catch (NullPointerException npe) {
+		}
+		return messageBytes;
+	}
 	
 	/**
 	 * Message time getter.

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -57,15 +57,26 @@
 	private Hashtable<String, Integer> actionCounterHash;
 	private Hashtable<String, Integer> actionFailedCounterHash;
 	private Hashtable<String, Long> actionProcessTimeHash;
+	private Hashtable<String, Integer> actionBytesProcessedHash;
+	private Hashtable<String, Integer> actionBytesFailedHash;
+	
 	private ConfigTree m_config;
 	private ConfigTree[] actionArray;
+	
 	private Integer serviceCount;
+	private Integer bytesFailed;
+	private Integer bytesProcessed;
 	
 	public static final String RESET_COUNTER = "resetCounter";
 	private static final String MESSAGE_COUNTER = "messages successfully processed count";
 	private static final String FAILED_MESSAGE_COUNTER = "messages failed count";
 	private static final String PROCESSING_TIME = "processing time";
+	private static final String BYTES_FAILED = "bytes failed";
+	private static final String BYTES_PROCESSED = "bytes processed";
+	
 	private static final String OVERALL_SERVICE_COUNT = "overall service message count";
+	private static final String OVERALL_BYTES_PROCESSED = "overall bytes processed";
+	private static final String OVERALL_BYTES_FAILED = "overall bytes failed";
 	
 	/**
 	 * Constructor
@@ -75,7 +86,12 @@
 		actionCounterHash = new Hashtable<String, Integer>();
 		actionFailedCounterHash = new Hashtable<String, Integer>();
 		actionProcessTimeHash = new Hashtable<String, Long>();
+		actionBytesProcessedHash = new Hashtable<String, Integer>();
+		actionBytesFailedHash = new Hashtable<String, Integer>();
+		
 		serviceCount = new Integer(0);
+		bytesProcessed = new Integer(0);
+		bytesFailed = new Integer(0);
 		
 		m_config = f_config;
 		actionArray = m_config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
@@ -116,8 +132,12 @@
 		 	actionCounterHash.put(actionId + " " + MESSAGE_COUNTER, new Integer(0));
 			actionFailedCounterHash.put(actionId + " " + FAILED_MESSAGE_COUNTER, new Integer(0));
 			actionProcessTimeHash.put(actionId + " " + PROCESSING_TIME, new Long(0));
+			actionBytesFailedHash.put(actionId + " " + BYTES_FAILED, new Integer(0));
+			actionBytesProcessedHash.put(actionId + " " + BYTES_PROCESSED, new Integer(0));
 		}		
 		serviceCount = new Integer(0);
+		bytesProcessed = new Integer(0);
+		bytesFailed = new Integer(0);
 	}
 	
 	/**
@@ -126,6 +146,8 @@
 	 */
 	public void resetCounter() {
 		serviceCount = new Integer(0);
+		bytesProcessed = new Integer(0);
+		bytesFailed = new Integer(0);
 		
 		for (String key : actionCounterHash.keySet()) {
 			actionCounterHash.put(key, new Integer(0));
@@ -138,6 +160,14 @@
 		for (String key : actionProcessTimeHash.keySet()) {
 			actionProcessTimeHash.put(key, new Long(0));
 		}
+		
+		for (String key : actionBytesFailedHash.keySet()) {
+			actionBytesFailedHash.put(key, new Integer(0));
+		}
+		
+		for (String key : actionBytesProcessedHash.keySet()) {
+			actionBytesProcessedHash.put(key, new Integer(0));
+		}
 	}
 	
 	/**
@@ -149,7 +179,9 @@
     public MBeanInfo getMBeanInfo() {
 		
 		int count = actionCounterHash.size() + actionProcessTimeHash.size()
-			+ actionFailedCounterHash.size() + 1;
+			+ actionFailedCounterHash.size() + actionBytesProcessedHash.size() 
+			+ actionBytesFailedHash.size() + 3; // the extra 3 here are overall service count, failed byte size
+												  // and processed byte size
         MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[count];
         int counter = 0;
               
@@ -171,11 +203,33 @@
             counter++;
 		}
 		
+		for (String key : actionBytesFailedHash.keySet()) {
+			attrs[counter] = new MBeanAttributeInfo(
+					key, "java.lang.Integer", "Property " + key, true, false, false);
+			counter++;
+		}
+		
+		for (String key : actionBytesProcessedHash.keySet()) {
+			attrs[counter] = new MBeanAttributeInfo(
+					key, "java.lang.Integer", "Property " + key, true, false, false);
+			counter++;
+		}		
+		
 		MBeanAttributeInfo overallCount = new MBeanAttributeInfo(OVERALL_SERVICE_COUNT, "java.lang.Integer",
 				"Property " + OVERALL_SERVICE_COUNT, true, false, false);
 		attrs[counter] = overallCount;
 		counter++;
-			
+
+		MBeanAttributeInfo overallBytesProcessed = new MBeanAttributeInfo(OVERALL_BYTES_PROCESSED, "java.lang.Integer",
+				"Property " + OVERALL_BYTES_PROCESSED, true, false, false);
+		attrs[counter] = overallBytesProcessed;
+		counter++;
+		
+		MBeanAttributeInfo overallBytesFailed = new MBeanAttributeInfo(OVERALL_BYTES_FAILED, "java.lang.Integer",
+				"Property " + OVERALL_BYTES_FAILED, true, false, false);
+		attrs[counter] = overallBytesFailed;
+		counter++;
+		
         MBeanOperationInfo[] opers = {
         	new MBeanOperationInfo(
         			RESET_COUNTER, "Reset the counter",
@@ -209,8 +263,18 @@
     	} else if (actionFailedCounterHash.containsKey(key)) {
     		Integer value = actionFailedCounterHash.get(key);
     		return value;
+    	} else if (actionBytesFailedHash.containsKey(key)) {
+    		Integer value = actionBytesFailedHash.get(key);
+    		return value;
+    	} else if (actionBytesProcessedHash.containsKey(key)) {
+    		Integer value = actionBytesProcessedHash.get(key);
+    		return value;
     	} else if (OVERALL_SERVICE_COUNT.equals(key)) {
     		return serviceCount;
+    	} else if (OVERALL_BYTES_PROCESSED.equals(key)) {
+    		return bytesProcessed;
+    	} else if (OVERALL_BYTES_FAILED.equals(key)) {
+    		return bytesFailed;
     	}
     	return null;
     }
@@ -245,6 +309,16 @@
 			attributeList.add(at);
 		}
 
+		for (String key : actionBytesFailedHash.keySet()) {
+			Attribute at = new Attribute(key, actionBytesFailedHash.get(key).toString());
+			attributeList.add(at);
+		}
+		
+		for (String key : actionBytesProcessedHash.keySet()) {
+			Attribute at = new Attribute(key, actionBytesProcessedHash.get(key).toString());
+			attributeList.add(at);
+		}
+		
 		return attributeList;
 	}
 
@@ -323,10 +397,18 @@
 			Long time = actionProcessTimeHash.get(actionName + " " + PROCESSING_TIME);
 			time = time.longValue() + asb.getProcTime();
 			actionProcessTimeHash.put(actionName + " " + PROCESSING_TIME, time);
+			Integer bProcessed = actionBytesProcessedHash.get(actionName + " " +  BYTES_PROCESSED);
+			bProcessed = bProcessed.intValue() + asb.getBytesProcessed();
+			actionBytesProcessedHash.put(actionName + " " + BYTES_PROCESSED, bProcessed);
+			bytesProcessed += asb.getBytesProcessed();
 		} else if (ActionStatusBean.ACTION_FAILED.equals(asb.getStatus())) {
 			Integer count = actionFailedCounterHash.get(actionName + " " + FAILED_MESSAGE_COUNTER);
 			count = count.intValue() + 1;
 			actionFailedCounterHash.put(actionName + " " + FAILED_MESSAGE_COUNTER, count);
+			bytesFailed += asb.getBytesProcessed();
+			Integer bFailed = actionBytesFailedHash.get(actionName + " " +  BYTES_FAILED);
+			bFailed = bFailed.intValue() + asb.getBytesProcessed();
+			actionBytesProcessedHash.put(actionName + " " + BYTES_FAILED, bFailed);
 		}		
 	}
 	

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java	2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java	2008-07-06 04:33:09 UTC (rev 20926)
@@ -54,6 +54,7 @@
 import org.jboss.internal.soa.esb.message.format.xml.XMLUtil;
 import org.jboss.internal.soa.esb.util.XMLHelper;
 import org.jboss.internal.soa.esb.util.stax.StreamHelper;
+import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;
 import org.jboss.soa.esb.helpers.KeyValuePair;
 import org.jboss.soa.esb.message.Message;
@@ -181,13 +182,16 @@
 
 	try
 	{
-	    final StringWriter writer = new StringWriter() ;
-	    final XMLStreamWriter out = XMLHelper.getXMLStreamWriter(writer) ;
-	    final String origURI = StreamHelper.writeStartElement(out, XMLUtil.ESB_QNAME_ENVELOPE) ;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final XMLStreamWriter out = XMLHelper.getXMLStreamWriter(baos) ;
+        final String origURI = StreamHelper.writeStartElement(out, XMLUtil.ESB_QNAME_ENVELOPE) ;
             ((MessageImpl) message).writeContent(out) ;
-	    StreamHelper.writeEndElement(out, XMLUtil.ESB_QNAME_ENVELOPE.getPrefix(), origURI) ;
-	    out.flush();
-	    return writer.toString() ;
+        StreamHelper.writeEndElement(out, XMLUtil.ESB_QNAME_ENVELOPE.getPrefix(), origURI) ;
+        out.flush();
+        int size = baos.toByteArray().length;
+        String outputString = baos.toString();
+        message.getProperties().setProperty(Environment.MESSAGE_BYTE_SIZE, "" + size);
+        return baos.toString();
 	}
 	catch (final XMLStreamException xmlse)
 	{
@@ -207,11 +211,14 @@
 
 	try
 	{
-            // MessageType.JBOSS_XML
-	    final StringReader reader = new StringReader((String)serial) ;
+        // MessageType.JBOSS_XML
+		int size = ((String)serial).getBytes().length;
+	    final StringReader reader = new StringReader((String)serial);
 	    final XMLStreamReader in = XMLHelper.getXMLStreamReader(reader) ;
 	    StreamHelper.checkNextStartTag(in, XMLUtil.ESB_QNAME_ENVELOPE) ;
-	    return new MessageImpl(in) ;
+	    Message mess = new MessageImpl(in);
+	    mess.getProperties().setProperty(Environment.MESSAGE_BYTE_SIZE, "" + size);
+	    return mess;
 	}
 	catch (XMLStreamException xmlse)
 	{




More information about the jboss-svn-commits mailing list