[jboss-svn-commits] JBL Code SVN: r20926 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb: listeners/message and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun Jul 6 00:33:10 EDT 2008
Author: tcunning
Date: 2008-07-06 00:33:09 -0400 (Sun, 06 Jul 2008)
New Revision: 20926
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java
Log:
bug:JBESB-1654
Add code to track byte size of message at serialization time for monitoring.
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -161,6 +161,7 @@
public static final String MESSAGE_SOURCE = "org.jboss.soa.esb.message.source";
public static final String MESSAGE_ENTRY_TIME = "org.jboss.soa.esb.message.time.dob"; // time born
public static final String MESSAGE_EXIT_TIME = "org.jboss.soa.esb.message.time.dod"; // time died
+ public static final String MESSAGE_BYTE_SIZE = "org.jboss.soa.esb.message.byte.size"; // size
/** Message property name for original filename */
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -353,7 +353,7 @@
}
long procTime = System.nanoTime() - start;
- serviceMessageCounter.update(new ActionStatusBean(procTime, count,
+ serviceMessageCounter.update(new ActionStatusBean(procTime, count, message,
ActionStatusBean.ACTION_FAILED));
DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message,
MessageStatusBean.MESSAGE_FAILED));
@@ -364,7 +364,7 @@
}
return false;
}
- serviceMessageCounter.update(new ActionStatusBean((System.nanoTime() - start), count,
+ serviceMessageCounter.update(new ActionStatusBean((System.nanoTime() - start), count, message,
ActionStatusBean.ACTION_SENT));
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionStatusBean.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -20,8 +20,11 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.soa.esb.listeners.message;
+package org.jboss.soa.esb.listeners.message;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.common.Environment;
+
/**
* ActionStatusBean is a bean which wraps the information necessary to update the
* ServiceMessageCounter MBean.
@@ -33,7 +36,9 @@
private int m_procCount;
private String m_status;
private long m_procTime;
+ private int m_bytesProcessed;
+
public static final String ACTION_SENT = "SENT";
public static final String ACTION_FAILED = "FAILED";
@@ -43,12 +48,22 @@
* @param f_procCount counter that identifies the action processor in the ConfigTree
* @param f_status status string
*/
- public ActionStatusBean(long procTime, int f_procCount, String f_status) {
+ public ActionStatusBean(long procTime, int f_procCount, Message f_message, String f_status) {
m_procTime = procTime;
m_procCount = f_procCount;
m_status = f_status;
+ m_bytesProcessed = 0;
+ try {
+ String bytes = (String) f_message.getProperties().getProperty(Environment.MESSAGE_BYTE_SIZE);
+ m_bytesProcessed = Integer.parseInt(bytes);
+ } catch (NullPointerException npe) {
+ }
}
+ public int getBytesProcessed() {
+ return m_bytesProcessed;
+ }
+
/**
* Process counter getter.
* @return counter that identifies the action processor in the ConfigTree
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -40,6 +40,8 @@
private int failedMessageCount;
private int successMessageCount;
private long totalProcessTime;
+ private int bytesProcessed;
+ private int bytesFailed;
/**
* Constructor.
@@ -64,7 +66,7 @@
public String getLastSuccessfulMessageDate() {
return lastSuccessfulMessageDate;
}
-
+
/**
* Get the time the last message was processed at.
* @return time the last message was processed at
@@ -89,6 +91,14 @@
return successMessageCount;
}
+ public int getBytesProcessed() {
+ return bytesProcessed;
+ }
+
+ public int getBytesFailed() {
+ return bytesFailed;
+ }
+
/**
* Update the message counter based on the MessageStatusBean that is returned
* from the observable.
@@ -102,10 +112,12 @@
successMessageCount++;
Timestamp ts = new Timestamp(msb.getMessageTime());
lastSuccessfulMessageDate = ts.toString();
+ bytesProcessed += msb.getMessageBytes();
} else if (msb.getMessageStatus().equals(MessageStatusBean.MESSAGE_FAILED)) {
failedMessageCount++;
Timestamp ts = new Timestamp(msb.getMessageTime());
lastFailedMessageDate = ts.toString();
+ bytesFailed += msb.getMessageBytes();
}
totalProcessTime += msb.getProcessTime();
@@ -133,5 +145,7 @@
successMessageCount = 0;
failedMessageCount = 0;
totalProcessTime = 0;
+ bytesProcessed = 0;
+ bytesFailed = 0;
}
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -43,4 +43,8 @@
public Double getAverageSuccessTime();
public void resetCounts();
+
+ public int getBytesProcessed();
+
+ public int getBytesFailed();
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageStatusBean.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -20,6 +20,7 @@
package org.jboss.soa.esb.listeners.message;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.common.Environment;
/**
* Bean that stores data on the message status. Since the Observer/Observable
@@ -89,6 +90,22 @@
public void setMessageStatus(String f_status) {
this.m_status = f_status;
}
+
+ /**
+ * Message number of bytes getter
+ * @return message bytes
+ */
+ public int getMessageBytes() {
+ int messageBytes = 0;
+ try {
+ String byteSizeString = (String) this.m_msg.getProperties().getProperty(Environment.MESSAGE_BYTE_SIZE);
+ if (byteSizeString != null) {
+ messageBytes = Integer.parseInt(byteSizeString);
+ }
+ } catch (NullPointerException npe) {
+ }
+ return messageBytes;
+ }
/**
* Message time getter.
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ServiceMessageCounter.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -57,15 +57,26 @@
private Hashtable<String, Integer> actionCounterHash;
private Hashtable<String, Integer> actionFailedCounterHash;
private Hashtable<String, Long> actionProcessTimeHash;
+ private Hashtable<String, Integer> actionBytesProcessedHash;
+ private Hashtable<String, Integer> actionBytesFailedHash;
+
private ConfigTree m_config;
private ConfigTree[] actionArray;
+
private Integer serviceCount;
+ private Integer bytesFailed;
+ private Integer bytesProcessed;
public static final String RESET_COUNTER = "resetCounter";
private static final String MESSAGE_COUNTER = "messages successfully processed count";
private static final String FAILED_MESSAGE_COUNTER = "messages failed count";
private static final String PROCESSING_TIME = "processing time";
+ private static final String BYTES_FAILED = "bytes failed";
+ private static final String BYTES_PROCESSED = "bytes processed";
+
private static final String OVERALL_SERVICE_COUNT = "overall service message count";
+ private static final String OVERALL_BYTES_PROCESSED = "overall bytes processed";
+ private static final String OVERALL_BYTES_FAILED = "overall bytes failed";
/**
* Constructor
@@ -75,7 +86,12 @@
actionCounterHash = new Hashtable<String, Integer>();
actionFailedCounterHash = new Hashtable<String, Integer>();
actionProcessTimeHash = new Hashtable<String, Long>();
+ actionBytesProcessedHash = new Hashtable<String, Integer>();
+ actionBytesFailedHash = new Hashtable<String, Integer>();
+
serviceCount = new Integer(0);
+ bytesProcessed = new Integer(0);
+ bytesFailed = new Integer(0);
m_config = f_config;
actionArray = m_config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
@@ -116,8 +132,12 @@
actionCounterHash.put(actionId + " " + MESSAGE_COUNTER, new Integer(0));
actionFailedCounterHash.put(actionId + " " + FAILED_MESSAGE_COUNTER, new Integer(0));
actionProcessTimeHash.put(actionId + " " + PROCESSING_TIME, new Long(0));
+ actionBytesFailedHash.put(actionId + " " + BYTES_FAILED, new Integer(0));
+ actionBytesProcessedHash.put(actionId + " " + BYTES_PROCESSED, new Integer(0));
}
serviceCount = new Integer(0);
+ bytesProcessed = new Integer(0);
+ bytesFailed = new Integer(0);
}
/**
@@ -126,6 +146,8 @@
*/
public void resetCounter() {
serviceCount = new Integer(0);
+ bytesProcessed = new Integer(0);
+ bytesFailed = new Integer(0);
for (String key : actionCounterHash.keySet()) {
actionCounterHash.put(key, new Integer(0));
@@ -138,6 +160,14 @@
for (String key : actionProcessTimeHash.keySet()) {
actionProcessTimeHash.put(key, new Long(0));
}
+
+ for (String key : actionBytesFailedHash.keySet()) {
+ actionBytesFailedHash.put(key, new Integer(0));
+ }
+
+ for (String key : actionBytesProcessedHash.keySet()) {
+ actionBytesProcessedHash.put(key, new Integer(0));
+ }
}
/**
@@ -149,7 +179,9 @@
public MBeanInfo getMBeanInfo() {
int count = actionCounterHash.size() + actionProcessTimeHash.size()
- + actionFailedCounterHash.size() + 1;
+ + actionFailedCounterHash.size() + actionBytesProcessedHash.size()
+ + actionBytesFailedHash.size() + 3; // the extra 3 here are overall service count, failed byte size
+ // and processed byte size
MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[count];
int counter = 0;
@@ -171,11 +203,33 @@
counter++;
}
+ for (String key : actionBytesFailedHash.keySet()) {
+ attrs[counter] = new MBeanAttributeInfo(
+ key, "java.lang.Integer", "Property " + key, true, false, false);
+ counter++;
+ }
+
+ for (String key : actionBytesProcessedHash.keySet()) {
+ attrs[counter] = new MBeanAttributeInfo(
+ key, "java.lang.Integer", "Property " + key, true, false, false);
+ counter++;
+ }
+
MBeanAttributeInfo overallCount = new MBeanAttributeInfo(OVERALL_SERVICE_COUNT, "java.lang.Integer",
"Property " + OVERALL_SERVICE_COUNT, true, false, false);
attrs[counter] = overallCount;
counter++;
-
+
+ MBeanAttributeInfo overallBytesProcessed = new MBeanAttributeInfo(OVERALL_BYTES_PROCESSED, "java.lang.Integer",
+ "Property " + OVERALL_BYTES_PROCESSED, true, false, false);
+ attrs[counter] = overallBytesProcessed;
+ counter++;
+
+ MBeanAttributeInfo overallBytesFailed = new MBeanAttributeInfo(OVERALL_BYTES_FAILED, "java.lang.Integer",
+ "Property " + OVERALL_BYTES_FAILED, true, false, false);
+ attrs[counter] = overallBytesFailed;
+ counter++;
+
MBeanOperationInfo[] opers = {
new MBeanOperationInfo(
RESET_COUNTER, "Reset the counter",
@@ -209,8 +263,18 @@
} else if (actionFailedCounterHash.containsKey(key)) {
Integer value = actionFailedCounterHash.get(key);
return value;
+ } else if (actionBytesFailedHash.containsKey(key)) {
+ Integer value = actionBytesFailedHash.get(key);
+ return value;
+ } else if (actionBytesProcessedHash.containsKey(key)) {
+ Integer value = actionBytesProcessedHash.get(key);
+ return value;
} else if (OVERALL_SERVICE_COUNT.equals(key)) {
return serviceCount;
+ } else if (OVERALL_BYTES_PROCESSED.equals(key)) {
+ return bytesProcessed;
+ } else if (OVERALL_BYTES_FAILED.equals(key)) {
+ return bytesFailed;
}
return null;
}
@@ -245,6 +309,16 @@
attributeList.add(at);
}
+ for (String key : actionBytesFailedHash.keySet()) {
+ Attribute at = new Attribute(key, actionBytesFailedHash.get(key).toString());
+ attributeList.add(at);
+ }
+
+ for (String key : actionBytesProcessedHash.keySet()) {
+ Attribute at = new Attribute(key, actionBytesProcessedHash.get(key).toString());
+ attributeList.add(at);
+ }
+
return attributeList;
}
@@ -323,10 +397,18 @@
Long time = actionProcessTimeHash.get(actionName + " " + PROCESSING_TIME);
time = time.longValue() + asb.getProcTime();
actionProcessTimeHash.put(actionName + " " + PROCESSING_TIME, time);
+ Integer bProcessed = actionBytesProcessedHash.get(actionName + " " + BYTES_PROCESSED);
+ bProcessed = bProcessed.intValue() + asb.getBytesProcessed();
+ actionBytesProcessedHash.put(actionName + " " + BYTES_PROCESSED, bProcessed);
+ bytesProcessed += asb.getBytesProcessed();
} else if (ActionStatusBean.ACTION_FAILED.equals(asb.getStatus())) {
Integer count = actionFailedCounterHash.get(actionName + " " + FAILED_MESSAGE_COUNTER);
count = count.intValue() + 1;
actionFailedCounterHash.put(actionName + " " + FAILED_MESSAGE_COUNTER, count);
+ bytesFailed += asb.getBytesProcessed();
+ Integer bFailed = actionBytesFailedHash.get(actionName + " " + BYTES_FAILED);
+ bFailed = bFailed.intValue() + asb.getBytesProcessed();
+ actionBytesProcessedHash.put(actionName + " " + BYTES_FAILED, bFailed);
}
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java 2008-07-06 02:04:01 UTC (rev 20925)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/util/Util.java 2008-07-06 04:33:09 UTC (rev 20926)
@@ -54,6 +54,7 @@
import org.jboss.internal.soa.esb.message.format.xml.XMLUtil;
import org.jboss.internal.soa.esb.util.XMLHelper;
import org.jboss.internal.soa.esb.util.stax.StreamHelper;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.message.Message;
@@ -181,13 +182,16 @@
try
{
- final StringWriter writer = new StringWriter() ;
- final XMLStreamWriter out = XMLHelper.getXMLStreamWriter(writer) ;
- final String origURI = StreamHelper.writeStartElement(out, XMLUtil.ESB_QNAME_ENVELOPE) ;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final XMLStreamWriter out = XMLHelper.getXMLStreamWriter(baos) ;
+ final String origURI = StreamHelper.writeStartElement(out, XMLUtil.ESB_QNAME_ENVELOPE) ;
((MessageImpl) message).writeContent(out) ;
- StreamHelper.writeEndElement(out, XMLUtil.ESB_QNAME_ENVELOPE.getPrefix(), origURI) ;
- out.flush();
- return writer.toString() ;
+ StreamHelper.writeEndElement(out, XMLUtil.ESB_QNAME_ENVELOPE.getPrefix(), origURI) ;
+ out.flush();
+ int size = baos.toByteArray().length;
+ String outputString = baos.toString();
+ message.getProperties().setProperty(Environment.MESSAGE_BYTE_SIZE, "" + size);
+ return baos.toString();
}
catch (final XMLStreamException xmlse)
{
@@ -207,11 +211,14 @@
try
{
- // MessageType.JBOSS_XML
- final StringReader reader = new StringReader((String)serial) ;
+ // MessageType.JBOSS_XML
+ int size = ((String)serial).getBytes().length;
+ final StringReader reader = new StringReader((String)serial);
final XMLStreamReader in = XMLHelper.getXMLStreamReader(reader) ;
StreamHelper.checkNextStartTag(in, XMLUtil.ESB_QNAME_ENVELOPE) ;
- return new MessageImpl(in) ;
+ Message mess = new MessageImpl(in);
+ mess.getProperties().setProperty(Environment.MESSAGE_BYTE_SIZE, "" + size);
+ return mess;
}
catch (XMLStreamException xmlse)
{
More information about the jboss-svn-commits
mailing list