[jboss-svn-commits] JBL Code SVN: r11361 - in labs/jbossesb/trunk/product/core: services/src/org/jboss/soa/esb/services/routing and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Apr 26 11:58:19 EDT 2007
Author: kurt.stam at jboss.com
Date: 2007-04-26 11:58:18 -0400 (Thu, 26 Apr 2007)
New Revision: 11361
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
Log:
Simplifying tags with using array
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java 2007-04-26 15:43:52 UTC (rev 11360)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java 2007-04-26 15:58:18 UTC (rev 11361)
@@ -68,6 +68,7 @@
*/
public class Aggregator extends AbstractActionPipelineProcessor
{
+ public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
private ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
= new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
private TimeoutChecker _timeoutChecker=null;
@@ -129,33 +130,27 @@
* @return a aggregated message, or null if the aggregation has not been completed.
* @throws ActionProcessingException
*/
- public Message process(Message message) throws ActionProcessingException
+ @SuppressWarnings("unchecked")
+ public Message process(Message message) throws ActionProcessingException
{
- boolean aggregatorTagIsFound=true;
- int tagCounter=0;
- while (aggregatorTagIsFound) {
- if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
- aggregatorTagIsFound=false;
- tagCounter--;
+ ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+
+ if (aggregatorTags!=null && aggregatorTags.size()>0) {
+ String aggregatorTag = (String) aggregatorTags.get(aggregatorTags.size()-1);
+ //Removing the last tags and setting them as "the one in current use"
+ if (aggregatorTags.size()>1) {
+ aggregatorTags.remove(aggregatorTags.size()-1);
+ message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTags);
} else {
- tagCounter++;
+ message.getProperties().remove(MessageRouter.AGGEGRATOR_TAG);
}
- }
- String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
- if (aggregatorTag!=null) {
-
- //Removing the tags and setting them as "the one in current use"
- message.getProperties().remove(MessageRouter.AGGEGRATOR_TAG + tagCounter);
- message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTag);
- Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
- message.getProperties().remove(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
- message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP, splitterTimeStamp);
-
String[] tag = aggregatorTag.split(":");
String uuId = tag[0];
String messageNumber = tag[1];
int totalNumberOfMessages = Integer.valueOf(tag[2]).intValue();
-
+ Long splitterTimeStamp = Long.valueOf(tag[3]);
+ message.getProperties().setProperty(SPLITTER_TIME_STAMP, splitterTimeStamp);
+
if (isTimedOut(message)) {
if (_aggregatedMessageMap.containsKey(uuId)) {
ConcurrentHashMap<String, Message> messageMap = _aggregatedMessageMap.get(uuId);
@@ -222,20 +217,13 @@
boolean isFirstTime=true;
for (Message message : messageMap.values()) {
//Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
+ //Only need to get it from the first message, should be the same for the others.
if (isFirstTime) {
- boolean aggregatorTagIsFound=true;
- int tagCounter=0;
- while (aggregatorTagIsFound) {
- if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
- aggregatorTagIsFound=false;
- } else {
- String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
- Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
- message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter, aggregatorTag);
- message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter, splitterTimeStamp);
- tagCounter++;
- }
+ ArrayList aggregatorTags = (ArrayList) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+ if (aggregatorTags!=null && aggregatorTags.size()>0) {
+ aggregatedMessage.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTags);
}
+ isFirstTime=false;
}
//Add the individual messages as attachments
aggregatedMessage.getAttachment().addItem(message);
@@ -257,7 +245,7 @@
*/
private boolean isTimedOut(Message message)
{
- long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP);
+ long splitterTimeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
if (timeoutInMillies!=null) {
long now = new Date().getTime();
long expiration = splitterTimeStamp + Long.valueOf(timeoutInMillies);
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-04-26 15:43:52 UTC (rev 11360)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-04-26 15:58:18 UTC (rev 11361)
@@ -23,6 +23,7 @@
import java.net.MalformedURLException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -75,30 +76,27 @@
* @param boolean - isSpitter, if true will Tag the messages for
* aggregation purposes.
*/
- public synchronized static void deliverMessages(
+ @SuppressWarnings("unchecked")
+ public synchronized static void deliverMessages(
Collection<String[]> destinations, Message message) throws MessageRouterException
{
String uuId = UUID.randomUUID().toString();
int counter=0;
- boolean aggregatorTagIsFound=true;
- int tagCounter=0;
- while (aggregatorTagIsFound) {
- if (message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter)==null){
- aggregatorTagIsFound=false;
- if (tagCounter>0) tagCounter--;
- } else {
- tagCounter++;
- }
- }
+
+
for (Iterator<String[]> i = destinations.iterator(); i.hasNext();)
{
//Only put tags on when routing to more then 1 destination
if (destinations.size()>1) {
- message.getProperties().setProperty(AGGEGRATOR_TAG + tagCounter, uuId + ":" + ++counter + ":" + destinations.size());
- message.getProperties().setProperty(SPLITTER_TIME_STAMP + tagCounter, new java.util.Date().getTime());
+ ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(AGGEGRATOR_TAG);
+ if (aggregatorTags==null) {
+ aggregatorTags = new ArrayList<String>();
+ }
+ String tag = uuId + ":" + ++counter + ":" + destinations.size() + ":" + new java.util.Date().getTime();
+ aggregatorTags.add(tag);
+ message.getProperties().setProperty(AGGEGRATOR_TAG, aggregatorTags);
if (logger.isDebugEnabled()) {
- String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter);
- logger.debug(AGGEGRATOR_TAG+ tagCounter + "=" + tag);
+ logger.debug(AGGEGRATOR_TAG+ "=" + tag);
}
}
More information about the jboss-svn-commits
mailing list