[jboss-svn-commits] JBL Code SVN: r17967 - labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Jan 18 16:52:05 EST 2008
Author: mark.little at jboss.com
Date: 2008-01-18 16:52:05 -0500 (Fri, 18 Jan 2008)
New Revision: 17967
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1500
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java 2008-01-18 20:42:45 UTC (rev 17966)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java 2008-01-18 21:52:05 UTC (rev 17967)
@@ -70,7 +70,7 @@
public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
private Map<String, Map< Integer, Message > > aggregatedMessageMap
- = new ConcurrentHashMap< String, Map< Integer, Message > >();
+ = new ConcurrentHashMap< String, Map< Integer, Message > >(); // can probably change this to a vanilla Map now
private TimeoutChecker _timeoutChecker=null;
protected ConfigTree config;
@@ -120,7 +120,7 @@
}
public Map<String, Map<Integer, Message>> getAggregatedMessageMap() {
- return aggregatedMessageMap;
+ return aggregatedMessageMap;
}
/**
@@ -161,37 +161,40 @@
// Set the timestamp on the message... used by the TimeoutChecker...
message.getProperties().setProperty(SPLITTER_TIME_STAMP, aggrDetails.getSeriesTimestamp());
- Map<Integer, Message> messageMap = aggregatedMessageMap.get(aggrDetails.getSeriesUuid());
- if (isTimedOut(aggrDetails.getSeriesTimestamp())) {
- if (messageMap != null) {
- //add the message in if we don't already have it
- if (!messageMap.containsKey(aggrDetails.getMessageNumber())) {
- messageMap.put(aggrDetails.getMessageNumber(), message);
- }
- //Just going to send out what we have this far, which will remove this key
- //so subsequent messages with this uuId will be ignored
- message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
- } else {
- logger.debug("Ignoring this message since we are already timedout on this message.");
- //ignoring this message
- message = null;
- }
- } else {
- //Get the messageMap with this uuId, or create it if not found
- if (messageMap == null) {
- messageMap = new ConcurrentHashMap<Integer, Message>();
- aggregatedMessageMap.put(aggrDetails.getSeriesUuid(), messageMap);
- }
- if (messageMap.containsKey(aggrDetails.getMessageNumber())) {
- logger.warn("Received duplicate message, ignoring it but this should not happen.");
- } else {
- messageMap.put(aggrDetails.getMessageNumber(), message);
- }
- if (messageMap.size() == aggrDetails.getSeriesSize()) {
- message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
- } else {
- message = null;
- }
+ synchronized (aggregatedMessageMap)
+ {
+ Map<Integer, Message> messageMap = aggregatedMessageMap.get(aggrDetails.getSeriesUuid());
+ if (isTimedOut(aggrDetails.getSeriesTimestamp())) {
+ if (messageMap != null) {
+ //add the message in if we don't already have it
+ if (!messageMap.containsKey(aggrDetails.getMessageNumber())) {
+ messageMap.put(aggrDetails.getMessageNumber(), message);
+ }
+ //Just going to send out what we have this far, which will remove this key
+ //so subsequent messages with this uuId will be ignored
+ message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
+ } else {
+ logger.debug("Ignoring this message since we are already timedout on this message.");
+ //ignoring this message
+ message = null;
+ }
+ } else {
+ //Get the messageMap with this uuId, or create it if not found
+ if (messageMap == null) {
+ messageMap = new ConcurrentHashMap<Integer, Message>();
+ aggregatedMessageMap.put(aggrDetails.getSeriesUuid(), messageMap);
+ }
+ if (messageMap.containsKey(aggrDetails.getMessageNumber())) {
+ logger.warn("Received duplicate message, ignoring it but this should not happen.");
+ } else {
+ messageMap.put(aggrDetails.getMessageNumber(), message);
+ }
+ if (messageMap.size() == aggrDetails.getSeriesSize()) {
+ message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
+ } else {
+ message = null;
+ }
+ }
}
} else {
// Just aggregate the single message...
@@ -309,7 +312,11 @@
throw new ActionProcessingException("Message attachment serialization failure", e);
}
}
- aggregatedMessageMap.remove(uuId);
+
+ synchronized (aggregatedMessageMap)
+ {
+ aggregatedMessageMap.remove(uuId);
+ }
//TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
//remove from the notificationMap if it is in there.
@@ -384,14 +391,15 @@
while(running) {
//no need to check if no timeout is set
if (timeoutInMillies!=null) {
+ // is a snapshot so no lock needed
for (Map<Integer, Message > messageMap : aggregatedMessageMap.values()) {
//Check the first message, they all have the same time stamp
Message message = messageMap.values().iterator().next();
long timeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
if (isTimedOut(timeStamp)) {
- //We found a timed-out message. Let's go notify ourselves about by resending a message,
- //it if we haven't done so already
+ //We found a timed-out message. Let's go notify ourselves about it by resending a message,
+ //if we haven't done so already
List<String> aggregatorTag = getAggregatorTags(message);
if(aggregatorTag != null && !aggregatorTag.isEmpty()) {
AggregationDetails aggrDetails = new AggregationDetails(aggregatorTag.get(aggregatorTag.size() - 1));
@@ -405,7 +413,10 @@
} catch(Throwable e) {
logger.error("Error delivering timed out aggregation message to Dead Letter Service.", e);
} finally {
- aggregatedMessageMap.remove(aggrDetails.getSeriesUuid());
+ synchronized (aggregatedMessageMap)
+ {
+ aggregatedMessageMap.remove(aggrDetails.getSeriesUuid());
+ }
}
}
}
More information about the jboss-svn-commits
mailing list