[jboss-svn-commits] JBL Code SVN: r17967 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Jan 18 16:52:05 EST 2008


Author: mark.little at jboss.com
Date: 2008-01-18 16:52:05 -0500 (Fri, 18 Jan 2008)
New Revision: 17967

Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1500

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java	2008-01-18 20:42:45 UTC (rev 17966)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java	2008-01-18 21:52:05 UTC (rev 17967)
@@ -70,7 +70,7 @@
     public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
     
     private Map<String, Map< Integer, Message > > aggregatedMessageMap
-        = new ConcurrentHashMap< String, Map< Integer, Message > >();
+        = new ConcurrentHashMap< String, Map< Integer, Message > >();  // can probably change this to a vanilla Map now
     private TimeoutChecker _timeoutChecker=null;
     
     protected ConfigTree config;
@@ -120,7 +120,7 @@
     }
 
     public Map<String, Map<Integer, Message>> getAggregatedMessageMap() {
-        return aggregatedMessageMap;
+    		return aggregatedMessageMap;
     }
 
     /**
@@ -161,37 +161,40 @@
             // Set the timestamp on the message... used by the TimeoutChecker...
             message.getProperties().setProperty(SPLITTER_TIME_STAMP, aggrDetails.getSeriesTimestamp());
 
-            Map<Integer, Message> messageMap = aggregatedMessageMap.get(aggrDetails.getSeriesUuid());
-            if (isTimedOut(aggrDetails.getSeriesTimestamp())) {
-                if (messageMap != null) {
-                    //add the message in if we don't already have it
-                    if (!messageMap.containsKey(aggrDetails.getMessageNumber())) {
-                         messageMap.put(aggrDetails.getMessageNumber(), message);
-                    }
-                    //Just going to send out what we have this far, which will remove this key
-                    //so subsequent messages with this uuId will be ignored
-                    message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
-                } else {
-                    logger.debug("Ignoring this message since we are already timedout on this message.");
-                    //ignoring this message
-                    message = null;
-                }
-            } else {
-                //Get the messageMap with this uuId, or create it if not found
-                if (messageMap == null) {
-                    messageMap = new ConcurrentHashMap<Integer, Message>();
-                    aggregatedMessageMap.put(aggrDetails.getSeriesUuid(), messageMap);
-                }
-                if (messageMap.containsKey(aggrDetails.getMessageNumber())) {
-                    logger.warn("Received duplicate message, ignoring it but this should not happen.");
-                } else {
-                    messageMap.put(aggrDetails.getMessageNumber(), message);
-                }
-                if (messageMap.size() == aggrDetails.getSeriesSize()) {
-                    message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
-                } else {
-                    message = null;
-                }
+            synchronized (aggregatedMessageMap)
+            {
+            	Map<Integer, Message> messageMap = aggregatedMessageMap.get(aggrDetails.getSeriesUuid());
+            	if (isTimedOut(aggrDetails.getSeriesTimestamp())) {
+            		if (messageMap != null) {
+            			//add the message in if we don't already have it
+            			if (!messageMap.containsKey(aggrDetails.getMessageNumber())) {
+            				messageMap.put(aggrDetails.getMessageNumber(), message);
+            			}
+            			//Just going to send out what we have this far, which will remove this key
+            			//so subsequent messages with this uuId will be ignored
+            			message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
+            		} else {
+            			logger.debug("Ignoring this message since we are already timedout on this message.");
+            			//ignoring this message
+            			message = null;
+            		}
+            	} else {
+            		//Get the messageMap with this uuId, or create it if not found
+            		if (messageMap == null) {
+            			messageMap = new ConcurrentHashMap<Integer, Message>();
+            			aggregatedMessageMap.put(aggrDetails.getSeriesUuid(), messageMap);
+            		}
+            		if (messageMap.containsKey(aggrDetails.getMessageNumber())) {
+            			logger.warn("Received duplicate message, ignoring it but this should not happen.");
+            		} else {
+            			messageMap.put(aggrDetails.getMessageNumber(), message);
+            		}
+            		if (messageMap.size() == aggrDetails.getSeriesSize()) {
+            			message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
+            		} else {
+            			message = null;
+            		}
+            	}
             }
         } else {
             // Just aggregate the single message...
@@ -309,7 +312,11 @@
                 throw new ActionProcessingException("Message attachment serialization failure", e);
             }
         }
-        aggregatedMessageMap.remove(uuId);
+        
+        synchronized (aggregatedMessageMap)
+        {
+        	aggregatedMessageMap.remove(uuId);
+        }
         //TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
         //remove from the notificationMap if it is in there.
         
@@ -384,14 +391,15 @@
             while(running) {
                 //no need to check if no timeout is set
                 if (timeoutInMillies!=null) {
+                	// is a snapshot so no lock needed
                     for (Map<Integer, Message > messageMap : aggregatedMessageMap.values()) {
                         //Check the first message, they all have the same time stamp
                         Message message = messageMap.values().iterator().next();
                         long timeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
                         
                         if (isTimedOut(timeStamp)) {
-                            //We found a timed-out message. Let's go notify ourselves about by resending a message,
-                            //it if we haven't done so already
+                            //We found a timed-out message. Let's go notify ourselves about it by resending a message,
+                            //if we haven't done so already
                             List<String> aggregatorTag = getAggregatorTags(message);
                             if(aggregatorTag != null && !aggregatorTag.isEmpty()) {
                                 AggregationDetails aggrDetails = new AggregationDetails(aggregatorTag.get(aggregatorTag.size() - 1));
@@ -405,7 +413,10 @@
                                 } catch(Throwable e) {
                                     logger.error("Error delivering timed out aggregation message to Dead Letter Service.", e);
                                 } finally {
-                                    aggregatedMessageMap.remove(aggrDetails.getSeriesUuid());
+                                	synchronized (aggregatedMessageMap)
+                                	{
+                                		aggregatedMessageMap.remove(aggrDetails.getSeriesUuid());
+                                	}
                                 }
                             }
                         }




More information about the jboss-svn-commits mailing list