[jboss-svn-commits] JBL Code SVN: r23190 - labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Sep 29 12:35:28 EDT 2008


Author: kevin.conner at jboss.com
Date: 2008-09-29 12:35:28 -0400 (Mon, 29 Sep 2008)
New Revision: 23190

Added:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterStatistics.java
Modified:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/DeliveryObservableLogger.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java
Log:
Fix MessageCounter concurrency: JBESB-2071

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2008-09-29 16:21:38 UTC (rev 23189)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2008-09-29 16:35:28 UTC (rev 23190)
@@ -22,7 +22,6 @@
 
 package org.jboss.soa.esb.listeners.message;
 
-import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -401,7 +400,7 @@
 
 			faultTo(callDetails, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
 			long procTime = System.nanoTime() - start;
-        	DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message,
+        	MessageCounterStatistics.getMessageCounterStatistics().update(new MessageStatusBean(procTime, message,
         			MessageStatusBean.MESSAGE_FAILED));
 
 			return false;
@@ -552,7 +551,7 @@
 						final long totalProcTime = System.nanoTime() - start;
 						serviceMessageCounter.update(new ActionStatusBean(procTime, count, message,
 							ActionStatusBean.ACTION_FAILED));
-						DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(totalProcTime, message,
+						MessageCounterStatistics.getMessageCounterStatistics().update(new MessageStatusBean(totalProcTime, message,
 							MessageStatusBean.MESSAGE_FAILED));
 
 						if (throwRuntime)
@@ -599,7 +598,7 @@
 				{
 					notifySuccess(messages);
 					long procTime = System.nanoTime() - start;
-					DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message,
+					MessageCounterStatistics.getMessageCounterStatistics().update(new MessageStatusBean(procTime, message,
 						MessageStatusBean.MESSAGE_SENT));
 					result = true;
 				}
@@ -610,7 +609,7 @@
 				final MessageValidationException mve = new MessageValidationException(validationFailure) ;
 				faultTo(callDetails, Factory.createErrorMessage(Factory.VALIDATION_FAILURE, message, mve));
 				long procTime = System.nanoTime() - start;
-				DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message,
+				MessageCounterStatistics.getMessageCounterStatistics().update(new MessageStatusBean(procTime, message,
 					MessageStatusBean.MESSAGE_FAILED));
 			}
 			return result ;

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/DeliveryObservableLogger.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/DeliveryObservableLogger.java	2008-09-29 16:21:38 UTC (rev 23189)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/DeliveryObservableLogger.java	2008-09-29 16:35:28 UTC (rev 23190)
@@ -25,6 +25,7 @@
  * DeliveryObservableLogger is an observable which reports out to Observers.
  *
  * @author <a href="mailto:tcunning at redhat.com">tcunning at redhat.com</a>
+ * @deprecated No longer used
  */
 public class DeliveryObservableLogger extends Observable {
 	private static DeliveryObservableLogger ref = null;

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java	2008-09-29 16:21:38 UTC (rev 23189)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java	2008-09-29 16:35:28 UTC (rev 23190)
@@ -20,12 +20,6 @@
 
 package org.jboss.soa.esb.listeners.message;
 
-import java.sql.Timestamp;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.jboss.system.ServiceMBeanSupport;
-
 /**
  * MessageCounter is a MBean implementation which keeps track of message data and 
  * metadata.
@@ -33,30 +27,14 @@
  * @author <a href="mailto:tcunning at redhat.com">tcunning at redhat.com</a> 
  *
  */
-public class MessageCounter extends ServiceMBeanSupport implements MessageCounterMBean, Observer {
+public class MessageCounter implements MessageCounterMBean {
 
-	private String lastSuccessfulMessageDate;
-	private String lastFailedMessageDate;
-	private int failedMessageCount;
-	private int successMessageCount;
-	private long totalProcessTime;
-	private int bytesProcessed;
-	private int bytesFailed;
-	
 	/**
-	 * Constructor.
-	 */
-	public MessageCounter () {
-		DeliveryObservableLogger.getInstance().addObserver(this);
-		resetCounts();
-	}
-	
-	/**
 	 * Get the total number of failed messages.
 	 * @return total number of failed messages
 	 */
 	public int getFailedMessageCount() {
-		return failedMessageCount;
+		return MessageCounterStatistics.getMessageCounterStatistics().getFailedMessageCount();
 	}
 
 	/**
@@ -64,7 +42,7 @@
 	 * @return time the last message was processed at
 	 */
 	public String getLastSuccessfulMessageDate() {
-		return lastSuccessfulMessageDate;
+		return MessageCounterStatistics.getMessageCounterStatistics().getLastSuccessfulMessageDate();
 	}
 	
 	/**
@@ -72,7 +50,7 @@
 	 * @return time the last message was processed at
 	 */
 	public String getLastFailedMessageDate() {
-		return lastFailedMessageDate;
+		return MessageCounterStatistics.getMessageCounterStatistics().getLastFailedMessageDate();
 	}
 	
 	/**
@@ -80,7 +58,7 @@
 	 * @return total number of processed messages
 	 */
 	public int getTotalMessageCount() {
-		return (failedMessageCount + successMessageCount);
+		return MessageCounterStatistics.getMessageCounterStatistics().getTotalMessageCount();
 	}
 
 	/**
@@ -88,64 +66,29 @@
 	 * @return total number of successfully processed messages
 	 */
 	public int getSuccessfulMessageCount() {
-		return successMessageCount;
+		return MessageCounterStatistics.getMessageCounterStatistics().getSuccessfulMessageCount();
 	}
 
 	public int getProcessedBytes() {
-		return bytesProcessed;
+		return MessageCounterStatistics.getMessageCounterStatistics().getProcessedBytes();
 	}
 	
 	public int getFailedBytes() {
-		return bytesFailed;
+		return MessageCounterStatistics.getMessageCounterStatistics().getFailedBytes();
 	}
-	
-	/**
-	 * Update the message counter based on the MessageStatusBean that is returned
-	 * from the observable.
-	 * @param o the observable object
-	 * @param arg the MessageStatusBean 
-	 */
-	public void update(Observable o, Object arg) {
-		MessageStatusBean msb = (MessageStatusBean) arg;
-		
-		if (msb.getMessageStatus().equals(MessageStatusBean.MESSAGE_SENT)) {
-			successMessageCount++;
-			Timestamp ts = new Timestamp(msb.getMessageTime());
-			lastSuccessfulMessageDate = ts.toString();
-			bytesProcessed += msb.getMessageBytes();
-		} else if (msb.getMessageStatus().equals(MessageStatusBean.MESSAGE_FAILED)) {
-			failedMessageCount++;
-			Timestamp ts = new Timestamp(msb.getMessageTime());
-			lastFailedMessageDate = ts.toString();
-			bytesFailed += msb.getMessageBytes();
-		}
-		
-		totalProcessTime += msb.getProcessTime();
-    }
 
 	/**
 	 * Get the average time to process a message.
 	 * @return average time to process a message 
 	 */
 	public Double getAverageSuccessTime() {
-		if ((successMessageCount) > 0) {
-			double ms = (double) totalProcessTime / (1000000 * successMessageCount);
-			return new Double(ms);
-		} else {
-			return null;
-		}
+		return MessageCounterStatistics.getMessageCounterStatistics().getAverageSuccessTime();
 	}
 	
 	/**
 	 * Reset the counts - this resets the totals and the last message time. 
 	 */
 	public void resetCounts() {
-		lastSuccessfulMessageDate = "---";
-		lastFailedMessageDate = "---";
-		successMessageCount = 0;
-		failedMessageCount = 0;
-		totalProcessTime = 0;
-		bytesProcessed = 0;
-		bytesFailed = 0;
+		MessageCounterStatistics.getMessageCounterStatistics().resetCounts();
 	}
 }

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java	2008-09-29 16:21:38 UTC (rev 23189)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterMBean.java	2008-09-29 16:35:28 UTC (rev 23190)
@@ -19,7 +19,6 @@
  */
 package org.jboss.soa.esb.listeners.message;
 
-import org.jboss.system.ServiceMBean;
 
 /**
  * MBean to represent simple message counting.    Stores the number of succesful and
@@ -29,7 +28,7 @@
  * @author <a href="mailto:tcunning at redhat.com">tcunning at redhat.com</a> 
  *
  */
-public interface MessageCounterMBean extends ServiceMBean {
+public interface MessageCounterMBean {
 	public int getTotalMessageCount();
 	
 	public int getSuccessfulMessageCount();

Copied: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterStatistics.java (from rev 23170, labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounter.java)
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterStatistics.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageCounterStatistics.java	2008-09-29 16:35:28 UTC (rev 23190)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+
+package org.jboss.soa.esb.listeners.message;
+
+import java.sql.Timestamp;
+import java.util.Observable;
+
+/**
+ * Extraction of core from MessageCounter bean.
+ * 
+ * @author <a href="mailto:tcunning at redhat.com">tcunning at redhat.com</a> 
+ *
+ */
+public class MessageCounterStatistics {
+
+
+	private static final MessageCounterStatistics SINGLETON = new MessageCounterStatistics() ;
+	
+	private long lastSuccessfulMessageDate;
+	private long lastFailedMessageDate;
+	private int failedMessageCount;
+	private int successMessageCount;
+	private long totalProcessTime;
+	private int bytesProcessed;
+	private int bytesFailed;
+	
+	/**
+	 * Constructor.
+	 */
+	public MessageCounterStatistics() {
+		resetCounts();
+	}
+	
+	/**
+	 * Get the total number of failed messages.
+	 * @return total number of failed messages
+	 */
+	public synchronized int getFailedMessageCount() {
+		return failedMessageCount;
+	}
+
+	/**
+	 * Get the time the last message was processed at.
+	 * @return time the last message was processed at
+	 */
+	public synchronized String getLastSuccessfulMessageDate() {
+		return getDate(lastSuccessfulMessageDate);
+	}
+	
+	/**
+	 * Get the time the last message was processed at.
+	 * @return time the last message was processed at
+	 */
+	public synchronized String getLastFailedMessageDate() {
+		return getDate(lastFailedMessageDate);
+	}
+	
+	/**
+	 * Get the total number of processed messages, both successful and failed.
+	 * @return total number of processed messages
+	 */
+	public synchronized int getTotalMessageCount() {
+		return (failedMessageCount + successMessageCount);
+	}
+
+	/**
+	 * Get the total number of successfully processed messages.
+	 * @return total number of successfully processed messages
+	 */
+	public synchronized int getSuccessfulMessageCount() {
+		return successMessageCount;
+	}
+
+	public synchronized int getProcessedBytes() {
+		return bytesProcessed;
+	}
+	
+	public synchronized int getFailedBytes() {
+		return bytesFailed;
+	}
+	
+	/**
+	 * Update the message counter based on the MessageStatusBean that is returned
+	 * from the observable.
+	 * @param o the observable object
+	 * @param arg the MessageStatusBean 
+	 */
+	public synchronized void update(MessageStatusBean msb) {
+		if (msb.getMessageStatus().equals(MessageStatusBean.MESSAGE_SENT)) {
+			successMessageCount++;
+			Timestamp ts = new Timestamp(msb.getMessageTime());
+			if (msb.getMessageTime() > lastSuccessfulMessageDate) {
+				lastSuccessfulMessageDate = msb.getMessageTime();
+			}
+			bytesProcessed += msb.getMessageBytes();
+		} else if (msb.getMessageStatus().equals(MessageStatusBean.MESSAGE_FAILED)) {
+			failedMessageCount++;
+			Timestamp ts = new Timestamp(msb.getMessageTime());
+			if (msb.getMessageTime() > lastFailedMessageDate) {
+				lastFailedMessageDate = msb.getMessageTime();
+			}
+			bytesFailed += msb.getMessageBytes();
+		}
+		
+		totalProcessTime += msb.getProcessTime();
+    }
+
+	/**
+	 * Get the average time to process a message.
+	 * @return average time to process a message 
+	 */
+	public synchronized Double getAverageSuccessTime() {
+		if ((successMessageCount) > 0) {
+			double ms = (double) totalProcessTime / (1000000 * successMessageCount);
+			return new Double(ms);
+		} else {
+			return null;
+		}
+	}
+	
+	/**
+	 * Reset the counts - this resets the totals and the last message time. 
+	 */
+	public synchronized void resetCounts() {
+		lastSuccessfulMessageDate = 0;
+		lastFailedMessageDate = 0;
+		successMessageCount = 0;
+		failedMessageCount = 0;
+		totalProcessTime = 0;
+		bytesProcessed = 0;
+		bytesFailed = 0;
+	}
+	
+	/**
+	 * Get the string representation of the time.
+	 * @param time The time.
+	 * @return The string representation.
+	 */
+	private static String getDate(final long time) {
+		if (time <= 0) {
+			return "---" ;
+		} else {
+			// not sure why timestamp is being used.
+			final Timestamp ts = new Timestamp(time) ;
+			return ts.toString() ;
+		}
+	}
+	
+	public static MessageCounterStatistics getMessageCounterStatistics() {
+		return SINGLETON ;
+	}
+}




More information about the jboss-svn-commits mailing list