[jboss-svn-commits] JBL Code SVN: r16733 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta: src/org/jboss/soa/esb/client and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Nov 21 11:52:43 EST 2007


Author: tfennelly
Date: 2007-11-21 11:52:43 -0500 (Wed, 21 Nov 2007)
New Revision: 16733

Added:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml
Modified:
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java
   labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1204
http://jira.jboss.com/jira/browse/JBESB-1201
http://jira.jboss.com/jira/browse/JBESB-1330
http://jira.jboss.com/jira/browse/JBESB-1331
http://jira.jboss.com/jira/browse/JBESB-746

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,157 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions;
+
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+
+/**
+ * Aggregation Details.
+ * 
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class AggregationDetails {
+
+    private String seriesUuid;
+    private int messageNumber = -1;
+    private int seriesSize = -1;
+    private long seriesTimestamp = -1;
+    private String splitId;
+
+    public AggregationDetails(String seriesUuid, int messageNumber, int seriesSize, long seriesTimestamp) {
+        AssertArgument.isNotNullAndNotEmpty(seriesUuid, "seriesUuid");
+        if(messageNumber < 1) {
+            throw new IllegalArgumentException("Invalid AggregatorDetails.  messageNumber < 1");
+        }
+        if(seriesSize < messageNumber) {
+            throw new IllegalArgumentException("Invalid AggregatorDetails.  seriesSize < messageNumber");
+        }
+        if(seriesTimestamp < 1) {
+            throw new IllegalArgumentException("Invalid AggregatorDetails.  timestamp < 1");
+        }
+
+        this.seriesUuid = seriesUuid;
+        this.messageNumber = messageNumber;
+        this.seriesSize = seriesSize;
+        this.seriesTimestamp = seriesTimestamp;
+    }
+
+    public AggregationDetails(String aggregatorTag) {
+        AssertArgument.isNotNullAndNotEmpty(aggregatorTag, "aggregatorTag");
+        String[] tokens = aggregatorTag.split(":");
+
+        if(tokens.length < 4) {
+            throw new IllegalArgumentException("Invalid Aggregator Tag.  Must have 4 tokens (colon separated).");
+        }
+
+        this.seriesUuid = tokens[0];
+        if(this.seriesUuid == null || this.seriesUuid.trim().equals("")) {
+            throw new IllegalArgumentException("Invalid Aggregator Tag: seriesUuid is blank.");
+        }
+        try {
+            this.messageNumber = Integer.parseInt(tokens[1]);
+            if(messageNumber < 1) {
+                throw new IllegalArgumentException("Invalid Aggregator Tag.  messageNumber < 1");
+            }
+            this.seriesSize = Integer.parseInt(tokens[2]);
+            if(seriesSize < messageNumber) {
+                throw new IllegalArgumentException("Invalid Aggregator Tag.  seriesSize < messageNumber");
+            }
+            this.seriesTimestamp = Long.parseLong(tokens[3]);
+            if(seriesTimestamp < 1) {
+                throw new IllegalArgumentException("Invalid Aggregator Tag.  timestamp < 1");
+            }
+            if(tokens.length == 5) {
+                splitId = tokens[4];
+            }
+        } catch(NumberFormatException e) {
+            if(this.messageNumber == -1) {
+                throw new IllegalArgumentException("Invalid Aggregator Tag.  'messageNumber' must be an int.");
+            } else if(this.seriesSize == -1) {
+                throw new IllegalArgumentException("Invalid Aggregator Tag.  'seriesSize' must be an int.");
+            } else if(this.seriesTimestamp == -1) {
+                throw new IllegalArgumentException("Invalid Aggregator Tag.  'timestamp' must be an long.");
+            }
+        }
+    }
+
+    public String getSeriesUuid() {
+        return seriesUuid;
+    }
+
+    public int getSeriesSize() {
+        return seriesSize;
+    }
+
+    public int getMessageNumber() {
+        return messageNumber;
+    }
+
+    public long getSeriesTimestamp() {
+        return seriesTimestamp;
+    }
+
+    public String getSplitId() {
+        return splitId;
+    }
+
+    public void setSplitId(String splitId) {
+        this.splitId = splitId;
+    }
+
+    public boolean equals(Object obj) {
+        if (!(obj instanceof AggregationDetails)) {
+            // also covers null
+            return false;
+        } else if (obj == this) {
+            return true;
+        } else {
+            return toString().equals(obj.toString());
+        }
+    }
+
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    public String toString() {
+        assertDetailsSet();
+        if(splitId == null) {
+            return seriesUuid + ":" + messageNumber + ":" + seriesSize + ":" + seriesTimestamp;
+        } else {
+            return seriesUuid + ":" + messageNumber + ":" + seriesSize + ":" + seriesTimestamp + ":" + splitId;
+        }
+    }
+
+    private void assertDetailsSet() {
+        if(seriesUuid == null || seriesUuid.trim().equals("")) {
+            throw new IllegalStateException("AggregationDetails not set: 'seriesUuid' is null or empty.");
+        }
+        if(messageNumber == -1) {
+            throw new IllegalStateException("AggregationDetails not set: 'messageNumber' is not set.");
+        }
+        if(seriesSize == -1) {
+            throw new IllegalStateException("AggregationDetails not set: 'seriesSize' is not set.");
+        }
+        if(seriesTimestamp == -1) {
+            throw new IllegalStateException("AggregationDetails not set: 'timestamp' is not set.");
+        }
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java	2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -27,32 +27,28 @@
  */
 package org.jboss.soa.esb.actions;
 
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.Service;
-import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.client.ServiceInvoker;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
+import static org.jboss.soa.esb.client.ServiceInvoker.DEAD_LETTER_SERVICE_NAME;
+import static org.jboss.soa.esb.client.ServiceInvoker.INTERNAL_SERVICE_CATEGORY;
 import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.format.MessageFactory;
-import org.jboss.soa.esb.services.registry.Registry;
 import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.util.Util;
 
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * Simple Aggregator. The aggregator relies on 'aggregatorTags'. To puzzle the individual
  * back together. The aggregatorTag is set in the MessageRouter.deliverAsync() method. The aggregator
@@ -73,26 +69,24 @@
     public final static String AGGEGRATOR_TAG = "aggregatorTag";
     public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
     
-    private ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
-        = new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
+    private Map<String, Map< Integer, Message > > aggregatedMessageMap
+        = new ConcurrentHashMap< String, Map< Integer, Message > >();
     private TimeoutChecker _timeoutChecker=null;
-    private ArrayList<String> _notified = new ArrayList<String>();
     
     protected ConfigTree config;
     private Logger logger = Logger.getLogger(Aggregator.class);
     private Long timeoutInMillies=null;
-    private String serviceName;
-    private String serviceCategoryName;
-    private Registry registry;
-    
-	private Aggregator(){}
+    private Set<String> receivedSplits = new HashSet<String>();
+    private String splitId;
 
+    private Aggregator(){}
+
 	public Aggregator(ConfigTree config) throws ConfigurationException, RegistryException
-	{
-        
+	{        
         this.config = config;
-		checkMyParms();
-        registry = RegistryFactory.getRegistry();
+        timeoutInMillies = Long.valueOf(config.getRequiredAttribute("timeoutInMillies"));
+        logger.debug("Aggregator config:  timeoutInMillies=" + timeoutInMillies);
+        splitId = config.getAttribute("splitId");
     }
         
     /**
@@ -110,13 +104,17 @@
         _timeoutChecker.start();
     }
 
-            /**
-             * Destroy the action instance.
-             * <p/>
-             * This method is called prior to the release of the action instance.  All
-             * resources associated with this action instance should be released as the
-             * instance will no longer be used.
-             */
+    public Map<String, Map<Integer, Message>> getAggregatedMessageMap() {
+        return aggregatedMessageMap;
+    }
+
+    /**
+     * Destroy the action instance.
+     * <p/>
+     * This method is called prior to the release of the action instance.  All
+     * resources associated with this action instance should be released as the
+     * instance will no longer be used.
+     */
     public void destroy()
         throws ActionLifecycleException
     {
@@ -137,82 +135,135 @@
 	@SuppressWarnings("unchecked")
     public Message process(Message message) throws ActionProcessingException
 	{
-        ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
-		
-        if (aggregatorTags!=null && aggregatorTags.size()>0) {
-            String aggregatorTag = (String) aggregatorTags.get(aggregatorTags.size()-1);
-            //Removing the last tags and setting them as "the one in current use"
-            if (aggregatorTags.size()>1) {
-                aggregatorTags.remove(aggregatorTags.size()-1);
-                message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
-            } else {
-                message.getProperties().remove(Aggregator.AGGEGRATOR_TAG);
-            }
-            String[] tag = aggregatorTag.split(":");
-            String uuId = tag[0];
-            String messageNumber = tag[1];
-            int totalNumberOfMessages = Integer.valueOf(tag[2]).intValue();
-            Long splitterTimeStamp = Long.valueOf(tag[3]);
-            message.getProperties().setProperty(SPLITTER_TIME_STAMP, splitterTimeStamp);
+        List<String> aggregatorTags = getAggregatorTags(message);
 
-            if (isTimedOut(message)) {
-                if (_aggregatedMessageMap.containsKey(uuId)) {
-                    ConcurrentHashMap<String, Message> messageMap = _aggregatedMessageMap.get(uuId);
+        if (aggregatorTags != null && aggregatorTags.size() > 0) {
+            String aggregatorTag = aggregatorTags.get(aggregatorTags.size()-1);
+            AggregationDetails aggrDetails = new AggregationDetails(aggregatorTag);
+
+            assertAggregationDetailsOK(aggrDetails);
+
+            // Set the timestamp on the message... used by the TimeoutChecker...
+            message.getProperties().setProperty(SPLITTER_TIME_STAMP, aggrDetails.getSeriesTimestamp());
+
+            Map<Integer, Message> messageMap = aggregatedMessageMap.get(aggrDetails.getSeriesUuid());
+            if (isTimedOut(aggrDetails.getSeriesTimestamp())) {
+                if (messageMap != null) {
                     //add the message in if we don't already have it
-                    if (!messageMap.containsKey(uuId)) {
-                         messageMap.put(messageNumber, message);
+                    if (!messageMap.containsKey(aggrDetails.getMessageNumber())) {
+                         messageMap.put(aggrDetails.getMessageNumber(), message);
                     }
                     //Just going to send out what we have this far, which will remove this key
                     //so subsequent messages with this uuId will be ignored
-                    message = createAggregateMessage(uuId, _aggregatedMessageMap.get(uuId));
+                    message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
                 } else {
                     logger.debug("Ignoring this message since we are already timedout on this message.");
                     //ignoring this message
                     message = null;
                 }
             } else {
-                ConcurrentHashMap<String, Message> messageMap=null;
                 //Get the messageMap with this uuId, or create it if not found
-                if (_aggregatedMessageMap.containsKey(uuId)) {
-                    messageMap = _aggregatedMessageMap.get(uuId);
-                } else {
-                    messageMap = new ConcurrentHashMap<String, Message>();
+                if (messageMap == null) {
+                    messageMap = new ConcurrentHashMap<Integer, Message>();
+                    aggregatedMessageMap.put(aggrDetails.getSeriesUuid(), messageMap);
                 }
-                if (messageMap.containsKey(messageNumber)) {
+                if (messageMap.containsKey(aggrDetails.getMessageNumber())) {
                     logger.warn("Received duplicate message, ignoring it but this should not happen.");
                 } else {
-                    messageMap.put(messageNumber, message);
-                    _aggregatedMessageMap.put(uuId, messageMap);
-                } 
-                if (messageMap.size()==totalNumberOfMessages) {
-                    message = createAggregateMessage(uuId, messageMap);
+                    messageMap.put(aggrDetails.getMessageNumber(), message);
+                }
+                if (messageMap.size() == aggrDetails.getSeriesSize()) {
+                    message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
                 } else {
                     message = null;
                 }
             }
         } else {
-            throw new ActionProcessingException("Could not find an aggregator tag, so this message can not be aggregated.");
+            // Just aggregate the single message...
+            message = createAggregateMessage(message);
         }
-		return message;
+        
+        return message;
 	}
-	/**
-     * Obtains the timeoutInMillies to a number
-     * 
-     * @throws ConfigurationException
-	 */
-	private void checkMyParms() throws ConfigurationException 
-    {
-        timeoutInMillies     = Long.valueOf(config.getAttribute("timeoutInMillies"));
-        logger.debug("Aggregator config:  timeoutInMillies=" + timeoutInMillies);
-        serviceName          = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-        serviceCategoryName  = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+
+    private void assertAggregationDetailsOK(AggregationDetails aggrDetails) throws ActionProcessingException {
+        if(splitId != null) {
+            if(!splitId.equals(aggrDetails.getSplitId())) {
+                throw new ActionProcessingException("Invalid aggregation config on aggregator '" + config.getAttribute("action") + "' .  " +
+                        "This aggregator is configured " +
+                        "to only aggregate message with an aggregation 'spliId' of '" + splitId + "'. " +
+                        "The splitId on the received message is '" + aggrDetails.getSplitId() + "'. " +
+                        "A nested aggregation point may be missing, or may have been bypassed.");
+            }
+        } else {
+            // log a warning if this aggregator starts receiving messages with
+            // different splitIds.  This would suggest an error in the nested split configuration.
+            if(!receivedSplits.contains(aggrDetails.getSplitId())) {
+                receivedSplits.add(aggrDetails.getSplitId());
+            }
+
+            if(receivedSplits.size() > 1) {
+                logger.warn("Aggregator action '" + config.getAttribute("action") + "' has received " +
+                        "split messages from more multiple 'splitId' sources: " + receivedSplits + "\n" +
+                        "You may need to configure an intermediate/nested aggregator at " +
+                        "some point in the message flow.");
+            }
+
+        }
     }
 
+    public static List<String> getAggregatorTags(Message message) {
+        return (List<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
+    }
+
+    public static void setAggregatorTags(Message message, List<String> tags) {
+        if(tags != null) {
+            message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, tags);
+        } else {
+            message.getProperties().remove(Aggregator.AGGEGRATOR_TAG);
+        }
+    }
+
+    public static AggregationDetails getAggregatorDetails(Message message, int tagIndex) throws ActionProcessingException {
+        List<String> tags = getAggregatorTags(message);
+
+        if(tags == null || tags.isEmpty()) {
+            return null;
+        } else {
+            return new AggregationDetails(tags.get(tagIndex));
+        }
+    }
+
     public static void decorate(Message message) {
 
     }
 
     /**
+     * Aggregates a single message into an aggregated message.
+     * <p/>
+     * This method is called for messages that are recived without aggregation tags.
+     *
+     * @param message
+     * @return the aggregated message
+     */
+    @SuppressWarnings("unchecked")
+    public Message createAggregateMessage(Message message) throws ActionProcessingException {
+        // Create an aggregated message
+        Message aggregatedMessage = MessageFactory.getInstance().getMessage();
+
+        setAggregatorTags(message, null);
+        try {
+            aggregatedMessage.getAttachment().addItem(Util.serialize(message));
+        } catch (ParserConfigurationException e) {
+            throw new ActionProcessingException("Message attachment serialization failure", e);
+        } catch (IOException e) {
+            throw new ActionProcessingException("Message attachment serialization failure", e);
+        }
+
+        return aggregatedMessage;
+    }
+
+    /**
      * Aggregates the messages into 1 new message with an attachment for each message.
      * 
      * @param uuId
@@ -220,45 +271,65 @@
      * @return the aggregated message
      */
     @SuppressWarnings("unchecked")
-    private Message createAggregateMessage(String uuId, ConcurrentHashMap<String, Message> messageMap) 
-    {
-        //Create an aggregated message
+    public Message createAggregateMessage(String uuId, Map<Integer, Message> messageMap) throws ActionProcessingException {
+        // Create an aggregated message
         Message aggregatedMessage = MessageFactory.getInstance().getMessage();
-        boolean isFirstTime=true;
-        for (Message message : messageMap.values()) {
-            //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
-            //Only need to get it from the first message, should be the same for the others.
-            if (isFirstTime) {
-                ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
-                if (aggregatorTags!=null && aggregatorTags.size()>0) {
-                    aggregatedMessage.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
-                }
-                isFirstTime=false;
+
+        //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
+        //Only need to get it from the first message, should be the same for the others.
+        List<String> aggregatedMessageTags = copyAggregationTags(messageMap);
+        setAggregatorTags(aggregatedMessage, aggregatedMessageTags);
+
+        for (Message attachmentMessage : messageMap.values()) {
+            //Add the individual messages as attachments
+            try {
+                // Clear the aggregation tags from the attachment message. Any future aggregation
+                // on the payload of these messages should be done within the context of the
+                // outer/aggregated message and it's tags.
+                setAggregatorTags(attachmentMessage, null);
+                aggregatedMessage.getAttachment().addItem(Util.serialize(attachmentMessage));
+            } catch (ParserConfigurationException e) {
+                throw new ActionProcessingException("Message attachment serialization failure", e);
+            } catch (IOException e) {
+                throw new ActionProcessingException("Message attachment serialization failure", e);
             }
-            //Add the individual messages as attachments
-            aggregatedMessage.getAttachment().addItem(message);
         }
-        _aggregatedMessageMap.remove(uuId);
+        aggregatedMessageMap.remove(uuId);
         //TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
         //remove from the notificationMap if it is in there.
-        _notified.remove(uuId);
         
         return aggregatedMessage;
     }
+
+    private List<String> copyAggregationTags(Map<Integer, Message> messageMap) {
+        // Get the tags from the first message...
+        List<String> nestedAggregationTags = getAggregatorTags(messageMap.values().iterator().next());
+
+        if(nestedAggregationTags != null && nestedAggregationTags.size() > 1) {
+            // clone the tags, just incase they get zapped from the source list...
+            List<String> aggregatedMessageTags = new ArrayList<String>();
+
+            aggregatedMessageTags.addAll(nestedAggregationTags);
+            // remove the last one because that's related to the nested aggregation...
+            aggregatedMessageTags.remove(aggregatedMessageTags.size() - 1);
+
+            return aggregatedMessageTags;
+        }
+
+        return null;
+    }
+
     /**
      * If the aggregation process is complete then return true. This depends on the configuration. 
      * 
-     * @param totalNumberOfMessages
-     * @param splitterTimeStamp
-     * @param messageMap
-     * @return
+     * @param splitterTimeStamp The splitter timestamp.
+     * @return True if the message is timed out, otherwise false.
      */
-    private boolean isTimedOut(Message message) 
+    private boolean isTimedOut(long splitterTimeStamp)
     {
-        long splitterTimeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
         if (timeoutInMillies!=null) {
             long now = new Date().getTime();
-            long expiration = splitterTimeStamp + Long.valueOf(timeoutInMillies);
+            long expiration = splitterTimeStamp + timeoutInMillies;
             if (logger.isDebugEnabled()) {
                 DateFormat dateFormat = DateFormat.getTimeInstance();
                 logger.debug("Current time=" + dateFormat.format(new Date(now)) 
@@ -274,7 +345,8 @@
         return false;
     }
     /**
-     * Checks for message that are timed out. If we find that one we notify ourselves about it by resending the message.
+     * Checks for message that are timed out. If we find that one, drop it in the DLQ and delete
+     * it from the map.
      * 
      * @author kstam
      *
@@ -282,39 +354,43 @@
     class TimeoutChecker extends Thread {
         private final Lock terminateLock = new ReentrantLock() ;
         private final Condition terminateCondition = terminateLock.newCondition() ;
-        private boolean terminated ; 
-        
+        private boolean terminated ;
+        ServiceInvoker dlQueueInvoker;
+
         @SuppressWarnings("unchecked")
         public void run() {
+            try {
+                dlQueueInvoker = new ServiceInvoker(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
+            } catch (MessageDeliverException e) {
+                logger.error("Unable to initialise Dead Letter Channel Service Invoker for Aggregation timeout checker. Not using Dead Letter Channel.", e);
+            }
+
             boolean running = true ;
             while(running) {
-            	Service theService = new Service(serviceCategoryName, serviceName);
-            	ServiceInvoker theServiceInvoker = null;
-            	
                 //no need to check if no timeout is set
                 if (timeoutInMillies!=null) {
-                    for (ConcurrentHashMap< String, Message > messageMap : _aggregatedMessageMap.values()) {
+                    for (Map<Integer, Message > messageMap : aggregatedMessageMap.values()) {
                         //Check the first message, they all have the same time stamp
                         Message message = messageMap.values().iterator().next();
-                        if (isTimedOut(message)) {
+                        long timeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
+                        
+                        if (isTimedOut(timeStamp)) {
                             //We found a timed-out message. Let's go notify ourselves about by resending a message,
                             //it if we haven't done so already
-                            ArrayList<String> aggregatorTag = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
-                            String uuId = aggregatorTag.get(0);
-                            if (!_notified.contains(uuId)) {
-                                _notified.add(uuId);
-                                logger.debug("Found timeout message.");
+                            List<String> aggregatorTag = getAggregatorTags(message);
+                            if(aggregatorTag != null && !aggregatorTag.isEmpty()) {
+                                AggregationDetails aggrDetails = new AggregationDetails(aggregatorTag.get(aggregatorTag.size() - 1));
+
                                 try {
-                                	if (theServiceInvoker == null)
-                                    	theServiceInvoker = new ServiceInvoker(theService);
-                                	
-                                	theServiceInvoker.deliverAsync(message);
-                                } catch (Exception e) {
-                                    logger.error(e.getMessage(), e);
-                                    //If we can't notify then drop this data
-                                    logger.debug("Deleting data for message series with uuId=" + uuId);
-                                    _notified.remove(uuId);
-                                    _aggregatedMessageMap.remove(uuId);
+                                    logger.info("Deleting message aggregation series: " + aggrDetails.getSeriesUuid());
+                                    if(dlQueueInvoker != null) {
+                                        Message aggregateMessage = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
+                                        dlQueueInvoker.deliverAsync(aggregateMessage);
+                                    }
+                                } catch(Throwable e) {
+                                    logger.error("Error delivering timed out aggregation message to Dead Letter Service.", e);
+                                } finally {
+                                    aggregatedMessageMap.remove(aggrDetails.getSeriesUuid());
                                 }
                             }
                         }
@@ -332,7 +408,7 @@
                 }
             }
         }
-            
+
         public void terminate() {
             terminateLock.lock() ;
             try

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java	2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -31,6 +31,7 @@
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.client.MessageMulticaster;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
@@ -68,6 +69,8 @@
      */
     public void initialise() throws ActionLifecycleException
     {
+        messageMulticaster = new MessageMulticaster(_config.getAttribute("action", "%unset%"));
+
         ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
         if (null == destList || destList.length < 1)
         {
@@ -92,7 +95,7 @@
 
     protected ConfigTree _config;
 
-    protected org.jboss.soa.esb.client.MessageMulticaster messageMulticaster = new org.jboss.soa.esb.client.MessageMulticaster();
+    protected MessageMulticaster messageMulticaster;
 
     protected static Logger _logger = Logger.getLogger(StaticWiretap.class);
 }

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java	2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -19,13 +19,14 @@
  */
 package org.jboss.soa.esb.client;
 
-import org.jboss.soa.esb.message.Message;
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
 import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.actions.AggregationDetails;
 import org.jboss.soa.esb.actions.Aggregator;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.internal.soa.esb.assertion.AssertArgument;
-import org.apache.log4j.Logger;
 
 import java.util.*;
 
@@ -48,8 +49,24 @@
     private static Logger logger = Logger.getLogger(MessageMulticaster.class);
 
     private Map<Service, ServiceInvoker> invokers = new LinkedHashMap<Service, ServiceInvoker>();
+    private String splitId;
 
     /**
+     * Public default constructor.
+     */
+    public MessageMulticaster() {        
+    }
+
+    /**
+     * Public default constructor.
+     * @param splitId Split ID for this multicaster.
+     */
+    public MessageMulticaster(String splitId) {
+        AssertArgument.isNotNullAndNotEmpty(splitId, "splitId");
+        this.splitId = splitId;
+    }
+
+    /**
      * Add a message recipient Service.
      * @param service Recipient service for receipt of messages from this miltcaster instance.
      * @throws RegistryException Failed to lookup Service endpoint.
@@ -102,26 +119,23 @@
      * @throws MessageDeliverException Failed to deliver message to endpoint.
      */
     public void sendToSubset(Message message, List<Service> recipients) throws RegistryException, MessageDeliverException {
-	if (recipients.isEmpty())
-	    logger.warn("MessageMulticaster.sendToSubset: empty recipients list!");
-	else
-	{
-            String uuId = UUID.randomUUID().toString();
-            ArrayList<String> aggregatorTags = new ArrayList<String>();
+        if (recipients.isEmpty()) {
+            logger.warn("MessageMulticaster.sendToSubset: empty recipients list!");
+        } else{
+            String seriesUUID = UUID.randomUUID().toString();
+            long seriesTimestamp = System.currentTimeMillis();
             int recipientCount = recipients.size();
-            long timestamp = System.currentTimeMillis();
-    
+
             for(int i = 0; i < recipientCount; i++) {
                 Service recipient = recipients.get(i);
                 ServiceInvoker invoker = getInvoker(recipient);
-                String tag = uuId + ":" + (i + 1) + ":" + recipientCount + ":" + timestamp;
-    
-                aggregatorTags.add(tag);
-                message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
-                if (logger.isDebugEnabled()) {
-                    logger.debug(Aggregator. AGGEGRATOR_TAG + "=" + tag);
+
+                if(recipientCount > 1) {
+                    // Only add aggregation info if we're splitting the message i.e. sending it to
+                    // more than 1 recipient...
+                    addAggregationDetails(message, seriesUUID, recipientCount, seriesTimestamp, i + 1);
                 }
-    
+
                 if(invoker == null) {
                     logger.error("Service '" + recipient + "' is not in recipient list.  Delivering message to Dead Letter Channel.");
                     ServiceInvoker.deliverToDeadLetterService(message);
@@ -134,10 +148,33 @@
                     }
                 }
             }
-	}
+        }
     }
 
-    private org.jboss.soa.esb.client.ServiceInvoker getInvoker(Service recipient) throws RegistryException, MessageDeliverException {
+    private void addAggregationDetails(Message message, String uuId, int recipientCount, long seriesTimestamp, int messageIndex) {
+        AggregationDetails aggrDetails = new AggregationDetails(uuId, messageIndex, recipientCount, seriesTimestamp);
+        ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
+
+        // This is useful during aggregation - as a way of id'ing where the split occurred.
+        aggrDetails.setSplitId(splitId);
+
+        if(aggregatorTags == null) {
+            aggregatorTags = new ArrayList<String>();
+            message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
+        }
+
+        if(messageIndex > 1) {
+            // remove the tag string for the last recipient...
+            aggregatorTags.remove(aggregatorTags.size() - 1);
+        }
+        aggregatorTags.add(aggrDetails.toString());
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(Aggregator. AGGEGRATOR_TAG + "=" + aggrDetails);
+        }
+    }
+
+    private ServiceInvoker getInvoker(Service recipient) throws RegistryException, MessageDeliverException {
         ServiceInvoker invoker = invokers.get(recipient);
 
         // We lazilly create the invokers...

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions;
+
+import junit.framework.TestCase;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class AggregationDetailsUnitTest extends TestCase {
+
+    public void test_multi_args() throws MessageDeliverException {
+        test_invalid_multi_args("", 1, 1, 1, "null or empty 'seriesUuid' arg in method call.");
+        test_invalid_multi_args("x", -1, 1, 1, "Invalid AggregatorDetails.  messageNumber < 1");
+        test_invalid_multi_args("x", 1, -1, 1, "Invalid AggregatorDetails.  seriesSize < messageNumber");
+        test_invalid_multi_args("x", 1, 1, -1, "Invalid AggregatorDetails.  timestamp < 1");
+
+        // This should work...
+        new AggregationDetails("x", 1, 1, 1);
+    }
+
+    private void test_invalid_multi_args(String seriesUuid, int messageNumber, int seriesSize, long seriesTimestamp, String errorMessage) {
+        try {
+            new AggregationDetails(seriesUuid, messageNumber, seriesSize, seriesTimestamp);
+            fail("Expected IllegalArgumentException/MessageDeliverException");
+        } catch(IllegalArgumentException e) {
+            assertEquals(errorMessage, e.getMessage());
+        }
+    }
+
+    public void test_tag_arg() throws MessageDeliverException {
+        test_invalid_tag_arg("", "null or empty 'aggregatorTag' arg in method call.");
+        test_invalid_tag_arg("1:1:1", "Invalid Aggregator Tag.  Must have 4 tokens (colon separated).");
+        test_invalid_tag_arg(":1:1:1", "Invalid Aggregator Tag: seriesUuid is blank.");
+        test_invalid_tag_arg("xxx:x:1:1", "Invalid Aggregator Tag.  'messageNumber' must be an int.");
+        test_invalid_tag_arg("xxx:1:x:1", "Invalid Aggregator Tag.  'seriesSize' must be an int.");
+        test_invalid_tag_arg("xxx:1:1:x", "Invalid Aggregator Tag.  'timestamp' must be an long.");
+        test_invalid_tag_arg("xxx:-1:1:1", "Invalid Aggregator Tag.  messageNumber < 1");
+        test_invalid_tag_arg("xxx:1:-1:1", "Invalid Aggregator Tag.  seriesSize < messageNumber");
+        test_invalid_tag_arg("xxx:1:1:-1", "Invalid Aggregator Tag.  timestamp < 1");
+
+        // This should work...
+        new AggregationDetails("xxx:1:1:1");        
+    }
+
+    private void test_invalid_tag_arg(String tag, String errorMessage) {
+        try {
+            new AggregationDetails(tag);
+            fail("Expected IllegalArgumentException/MessageDeliverException");
+        } catch(IllegalArgumentException e) {
+            assertEquals(errorMessage, e.getMessage());
+        }
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,280 @@
+/*
+	Milyn - Copyright (C) 2006
+
+	This library is free software; you can redistribute it and/or
+	modify it under the terms of the GNU Lesser General Public
+	License (version 2.1) as published by the Free Software
+	Foundation.
+
+	This library is distributed in the hope that it will be useful,
+	but WITHOUT ANY WARRANTY; without even the implied warranty of
+	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+	See the GNU Lesser General Public License for more details:
+	http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.soa.esb.actions.aggregation;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.testutils.ESBConfigUtil;
+
+import java.util.Map;
+
+/**
+ * Tests for JBESB-1201 re timeout management.
+ * <p/>
+ * Make sure the message aggregation info flows in the following scenario...
+ * <pre>
+ *
+ *            |----- service1 -----|
+ *            |                    |
+ * splitter --|                    |-- aggregator
+ *            |                    |
+ *            |----- service2 -----|
+ *
+ * </pre>
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class JBESB_1201_UnitTest extends TestCase {
+
+    private TestCourier service1Courier;
+    private TestCourier service2Courier;
+    private TestCourier aggregatorCourier;
+    private TestCourier dlqServiceCourier;
+
+    private StaticRouter splitter;
+    private StaticRouter service1;
+    private StaticRouter service2;
+    private Aggregator aggregator;
+
+    protected void setUp() throws Exception {
+        MockCourierFactory.install();
+        MockRegistry.install();
+
+        service1Courier = new TestCourier();
+        service2Courier = new TestCourier();
+        aggregatorCourier = new TestCourier();
+        dlqServiceCourier = new TestCourier();
+
+        MockRegistry.register("test", "service1", service1Courier);
+        MockRegistry.register("test", "service2", service2Courier);
+        MockRegistry.register("test", "aggregator", aggregatorCourier);
+        MockRegistry.register(ServiceInvoker.INTERNAL_SERVICE_CATEGORY, ServiceInvoker.DEAD_LETTER_SERVICE_NAME, dlqServiceCourier);
+
+        initaliseServices();
+    }
+
+    private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException {
+        ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-01.xml"));
+        ConfigTree splitterConfig = esbConfig.getActionConfig("null-listener", "splitter1-action");
+        ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
+        ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
+        ConfigTree aggregatorConfig = esbConfig.getActionConfig("null-listener", "aggregator-config");
+
+        splitter = new StaticRouter(splitterConfig);
+        splitter.initialise();
+        service1 = new StaticRouter(service1Config);
+        service1.initialise();
+        service2 = new StaticRouter(service2Config);
+        service2.initialise();
+        aggregator = new Aggregator(aggregatorConfig);
+        aggregator.initialise();
+    }
+
+    protected void tearDown() throws Exception {
+        splitter.destroy();
+        service1.destroy();
+        service2.destroy();
+        aggregator.destroy();
+        MockRegistry.uninstall();
+        MockCourierFactory.uninstall();
+    }
+
+    public void test_not_timed_out() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+
+        // Get the aggregators message map and make sure it's empty...
+        Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Manually deliver the message to the splitter service...
+        splitter.process(messageIn);
+        AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+        assertNotNull(service1Message);
+
+        // Manually deliver the message in service1Courier into service1...
+        service1.process(service1Courier.messages.get(0));
+
+        // Manually deliver the message in service2Courier into service2...
+        service2.process(service2Courier.messages.get(0));
+
+        // 2 messages should arrive at the aggregatorCourier...
+        assertEquals(2, aggregatorCourier.messages.size());
+
+        // Aggregators message map should be empty...
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Pump the 1st message into the aggregater...
+        aggregator.process(aggregatorCourier.messages.get(0));
+        assertService1MessageDetailsOK(aggrMessageMap, service1Message);
+
+        // Pump the 2nd (last) message into the aggregater...
+        Message aggregateMessage = aggregator.process(aggregatorCourier.messages.get(1));
+
+        assertNotNull(aggregateMessage);
+        assertEquals(2, aggregateMessage.getAttachment().getUnnamedCount());
+
+        // Make sure the aggregators message map is empty again...
+        assertTrue(aggrMessageMap.isEmpty());
+    }
+
+    public void test_timed_out_non_delivered() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+
+        // Get the aggregators message map and make sure it's empty...
+        Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Manually deliver the message to the splitter service...
+        splitter.process(messageIn);
+        AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+        assertNotNull(service1Message);
+
+        // The aggregator timeout is 2000ms... sleep here for 3000ms to force all
+        // messages delivered to the aggregator to be timed out - should all be
+        // ignored...
+        Thread.sleep(3000);
+
+        // Manually deliver the message in service1Courier into service1...
+        service1.process(service1Courier.messages.get(0));
+
+        // Manually deliver the message in service2Courier into service2...
+        service2.process(service2Courier.messages.get(0));
+
+        // 2 messages should arrive at the aggregatorCourier...
+        assertEquals(2, aggregatorCourier.messages.size());
+
+        // Aggregators message map should be empty...
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Pump the 1st message into the aggregater...
+        aggregator.process(aggregatorCourier.messages.get(0));
+
+        // Aggregators message map should still be empty because the message should
+        // have been ignored...
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Pump the 2nd (last) message into the aggregater...
+        aggregator.process(aggregatorCourier.messages.get(1));
+
+        // Aggregators message map should still be empty because the message should
+        // have been ignored...
+        assertTrue(aggrMessageMap.isEmpty());
+    }
+
+    public void test_timed_out_some_delivered() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+
+        // Get the aggregators message map and make sure it's empty...
+        Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Manually deliver the message to the splitter service...
+        splitter.process(messageIn);
+        AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+        assertNotNull(service1Message);
+
+        // Manually deliver the message in service1Courier into service1...
+        service1.process(service1Courier.messages.get(0));
+
+        // 1 messages should arrive at the aggregatorCourier...
+        assertEquals(1, aggregatorCourier.messages.size());
+
+        // Aggregators message map should be empty...
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Pump the 1st message into the aggregater and make sure it's
+        // details are added properly to the map...
+        aggregator.process(aggregatorCourier.messages.get(0));
+        assertService1MessageDetailsOK(aggrMessageMap, service1Message);
+
+        // The deadServiceCourier should be empty...
+        assertEquals(0, dlqServiceCourier.messages.size());
+
+        // The aggregator timeout is 2000ms... sleep here for 3000ms to force the 2nd
+        // message delivered to the aggregator to be timed out...
+        Thread.sleep(3000);
+
+        // Should have timed out and "notified" on the first message...
+        assertEquals(1, dlqServiceCourier.messages.size());
+        // Aggregators message map should be empty again...
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Manually deliver the message in service2Courier into service2...
+        service2.process(service2Courier.messages.get(0));
+
+        // 2 messages should be at the aggregatorCourier...
+        assertEquals(2, aggregatorCourier.messages.size());
+
+        // Pump the 2nd message into the aggregater...
+        Message aggregateMessage = aggregator.process(aggregatorCourier.messages.get(1));
+        assertNull(aggregateMessage);
+
+        // Aggregators message map should be empty because the message should
+        // have been ignored...
+        assertTrue(aggrMessageMap.isEmpty());
+    }
+
+    public void test_timeoutchecker() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+
+        // Get the aggregators message map and make sure it's empty...
+        Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Manually deliver the message to the splitter service...
+        splitter.process(messageIn);
+        AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+        assertNotNull(service1Message);
+
+        // Manually deliver the message in service1Courier into service1...
+        service1.process(service1Courier.messages.get(0));
+
+        // 1 messages should arrive at the aggregatorCourier...
+        assertEquals(1, aggregatorCourier.messages.size());
+
+        // Aggregators message map should be empty...
+        assertTrue(aggrMessageMap.isEmpty());
+
+        // Pump the message into the aggregater and make sure it's
+        // details are added properly to the map...
+        aggregator.process(aggregatorCourier.messages.get(0));
+        assertService1MessageDetailsOK(aggrMessageMap, service1Message);
+
+        // The aggregator timeout is 2000ms... sleep here for 3000ms...
+        Thread.sleep(4000);
+
+        // Aggregators message map should be empty because the message should
+        // have timed out...
+        assertTrue("Message didn't get removed from map after timeout", aggrMessageMap.isEmpty());
+    }
+
+    private void assertService1MessageDetailsOK(Map<String, Map<Integer, Message>> aggrMessageMap, AggregationDetails service1Message) {
+        // Aggregators message map should have 1 entry for the above message...
+        assertEquals(1, aggrMessageMap.size());
+        Map<Integer, Message> messageSeries = aggrMessageMap.get(service1Message.getSeriesUuid());
+        assertNotNull(messageSeries);
+        assertTrue(messageSeries.get(service1Message.getMessageNumber()) != null);
+    }
+}
\ No newline at end of file


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,168 @@
+/*
+	Milyn - Copyright (C) 2006
+
+	This library is free software; you can redistribute it and/or
+	modify it under the terms of the GNU Lesser General Public
+	License (version 2.1) as published by the Free Software
+	Foundation.
+
+	This library is distributed in the hope that it will be useful,
+	but WITHOUT ANY WARRANTY; without even the implied warranty of
+	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+	See the GNU Lesser General Public License for more details:
+	http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.soa.esb.actions.aggregation;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.testutils.ESBConfigUtil;
+
+import java.util.List;
+
+/**
+ * Tests for JBESB-1204 and JBESB-1331 re aggrgation tag infos.
+ * <p/>
+ * Make sure the message aggregation info flows in the following scenario...
+ * <pre>
+ *
+ *            |----- service1 -----|
+ *            |                    |
+ * splitter --|                    |-- aggregator
+ *            |                    |
+ *            |----- service2 -----|
+ *
+ * </pre>
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class JBESB_1204_1331_UnitTest extends TestCase {
+
+    private TestCourier service1Courier;
+    private TestCourier service2Courier;
+    private TestCourier aggregatorCourier;
+
+    private StaticRouter splitter;
+    private StaticRouter service1;
+    private StaticRouter service2;
+    private Aggregator aggregator;
+
+    protected void setUp() throws Exception {
+        MockCourierFactory.install();
+        MockRegistry.install();
+
+        service1Courier = new TestCourier();
+        service2Courier = new TestCourier();
+        aggregatorCourier = new TestCourier();
+
+        MockRegistry.register("test", "service1", service1Courier);
+        MockRegistry.register("test", "service2", service2Courier);
+        MockRegistry.register("test", "aggregator", aggregatorCourier);
+
+        initaliseServices();
+    }
+
+    private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException {
+        ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-01.xml"));
+        ConfigTree splitterConfig = esbConfig.getActionConfig("null-listener", "splitter1-action");
+        ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
+        ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
+        ConfigTree aggregatorConfig = esbConfig.getActionConfig("null-listener", "aggregator-config");
+
+        splitter = new StaticRouter(splitterConfig);
+        splitter.initialise();
+        service1 = new StaticRouter(service1Config);
+        service1.initialise();
+        service2 = new StaticRouter(service2Config);
+        service2.initialise();
+        aggregator = new Aggregator(aggregatorConfig);
+        aggregator.initialise();
+    }
+
+    protected void tearDown() throws Exception {
+        splitter.destroy();
+        service1.destroy();
+        service2.destroy();
+        aggregator.destroy();
+        MockRegistry.uninstall();
+        MockCourierFactory.uninstall();
+    }
+
+    public void test() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+        List<String> aggrTags;
+        AggregationDetails aggrDetailsS1;
+        AggregationDetails aggrDetailsS2;
+        AggregationDetails aggrDetailsAggrM1;
+        AggregationDetails aggrDetailsAggrM2;
+
+        // Manually deliver the message to the splitter service...
+        splitter.process(messageIn);
+        
+        // There should be 2 msssages delivered.. one to service1 and one service2...
+        assertEquals(1, service1Courier.messages.size());
+        assertEquals(1, service2Courier.messages.size());
+
+        // The 2 messages should each have 1 aggregation info string (no more)...
+        aggrTags = Aggregator.getAggregatorTags(service1Courier.messages.get(0));
+        assertEquals(1, aggrTags.size());
+        aggrDetailsS1 = new AggregationDetails(aggrTags.get(0));
+        aggrTags = Aggregator.getAggregatorTags(service2Courier.messages.get(0));
+        assertEquals(1, aggrTags.size());
+        aggrDetailsS2 = new AggregationDetails(aggrTags.get(0));        
+
+        // Manually deliver the message in service1Courier into service1...
+        service1.process(service1Courier.messages.get(0));
+
+        // Manually deliver the message in service2Courier into service2...
+        service2.process(service2Courier.messages.get(0));
+
+        // 2 messages should arrive at the aggregatorCourier...
+        assertEquals(2, aggregatorCourier.messages.size());
+
+        // Extract aggr details from first message...
+        aggrTags = Aggregator.getAggregatorTags(aggregatorCourier.messages.get(0));
+        assertEquals(1, aggrTags.size());
+        aggrDetailsAggrM1 = new AggregationDetails(aggrTags.get(0));
+
+        // Extract aggr details from second message...
+        aggrTags = Aggregator.getAggregatorTags(aggregatorCourier.messages.get(1));
+        assertEquals(1, aggrTags.size());
+        aggrDetailsAggrM2 = new AggregationDetails(aggrTags.get(0));
+
+        // make sure all the UUIDs match...
+        assertEquals(aggrDetailsS1.getSeriesUuid(), aggrDetailsS2.getSeriesUuid()); 
+        assertEquals(aggrDetailsS2.getSeriesUuid(), aggrDetailsAggrM1.getSeriesUuid());
+        assertEquals(aggrDetailsAggrM1.getSeriesUuid(), aggrDetailsAggrM2.getSeriesUuid()); 
+
+        // make sure all the timestamps match...
+        assertEquals(aggrDetailsS1.getSeriesTimestamp(), aggrDetailsS2.getSeriesTimestamp());
+        assertEquals(aggrDetailsS2.getSeriesTimestamp(), aggrDetailsAggrM1.getSeriesTimestamp());
+        assertEquals(aggrDetailsAggrM1.getSeriesTimestamp(), aggrDetailsAggrM2.getSeriesTimestamp()); 
+
+        // make sure the series size matches...
+        assertEquals(aggrDetailsS1.getSeriesSize(), aggrDetailsS2.getSeriesSize());
+        assertEquals(aggrDetailsS2.getSeriesSize(), aggrDetailsAggrM1.getSeriesSize());
+        assertEquals(aggrDetailsAggrM1.getSeriesSize(), aggrDetailsAggrM2.getSeriesSize());
+
+        // make sure the message num's don't match, and that they're 1 or 2...
+        assertNotSame(aggrDetailsAggrM1.getMessageNumber(), aggrDetailsAggrM2.getMessageNumber());
+        assertTrue(aggrDetailsAggrM1.getMessageNumber() == 1 || aggrDetailsAggrM1.getMessageNumber() == 2);
+        assertTrue(aggrDetailsAggrM2.getMessageNumber() == 1 || aggrDetailsAggrM2.getMessageNumber() == 2);
+
+        // make sure the splitId matches...
+        assertEquals("splitter1-action", aggrDetailsS1.getSplitId());
+        assertEquals(aggrDetailsS1.getSplitId(), aggrDetailsS2.getSplitId());
+        assertEquals(aggrDetailsS2.getSplitId(), aggrDetailsAggrM1.getSplitId());
+        assertEquals(aggrDetailsAggrM1.getSplitId(), aggrDetailsAggrM2.getSplitId());
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,237 @@
+/*
+	Milyn - Copyright (C) 2006
+
+	This library is free software; you can redistribute it and/or
+	modify it under the terms of the GNU Lesser General Public
+	License (version 2.1) as published by the Free Software
+	Foundation.
+
+	This library is distributed in the hope that it will be useful,
+	but WITHOUT ANY WARRANTY; without even the implied warranty of
+	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+	See the GNU Lesser General Public License for more details:
+	http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.soa.esb.actions.aggregation;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.testutils.ESBConfigUtil;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for JBESB-1204 and JBESB-1331.
+ * <p/>
+ * Make sure the message aggregation info flows in the following scenario...
+ * <pre>
+ *
+ *             |------------------ service1 ----------------|
+ *             |                                            |
+ * splitter1 --|                                            |-- aggregator2 --
+ *             |               |--service2--|               |
+ *             |-- splitter2 --|            |--aggregator1--|
+ *                             |--service3--|
+ *
+ * </pre>
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class Nested_Splits_UnitTest extends TestCase {
+
+    private TestCourier splitter2Courier; // no need for a courier for splitter1
+    private TestCourier service1Courier;
+    private TestCourier service2Courier;
+    private TestCourier service3Courier;
+    private TestCourier aggregator1Courier;
+    private TestCourier aggregator2Courier;
+
+    private StaticRouter splitter1;
+    private StaticRouter splitter2;
+    private StaticRouter service1;
+    private StaticRouter service2;
+    private StaticRouter service3;
+    private Aggregator aggregator1;
+    private Aggregator aggregator2;
+    private Aggregator aggregator3;
+
+    protected void setUp() throws Exception {
+        MockCourierFactory.install();
+        MockRegistry.install();
+
+        splitter2Courier = new TestCourier();
+        service1Courier = new TestCourier();
+        service2Courier = new TestCourier();
+        service3Courier = new TestCourier();
+        aggregator1Courier = new TestCourier();
+        aggregator2Courier = new TestCourier();
+
+        MockRegistry.register("test", "splitter2", splitter2Courier);
+        MockRegistry.register("test", "service1", service1Courier);
+        MockRegistry.register("test", "service2", service2Courier);
+        MockRegistry.register("test", "service3", service3Courier);
+        MockRegistry.register("test", "aggregator1", aggregator1Courier);
+        MockRegistry.register("test", "aggregator2", aggregator2Courier);
+
+        initaliseServices();
+    }
+
+    private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException {
+        ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-02.xml"));
+        ConfigTree splitter1Config = esbConfig.getActionConfig("null-listener", "splitter1-action");
+        ConfigTree splitter2Config = esbConfig.getActionConfig("null-listener", "splitter2-action");
+        ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
+        ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
+        ConfigTree service3Config = esbConfig.getActionConfig("null-listener", "service3-config");
+        ConfigTree aggregator1Config = esbConfig.getActionConfig("null-listener", "aggregator1-config");
+        ConfigTree aggregator2Config = esbConfig.getActionConfig("null-listener", "aggregator2-config");
+        ConfigTree aggregator3Config = esbConfig.getActionConfig("null-listener", "aggregator3-config");
+
+        splitter1 = new StaticRouter(splitter1Config);
+        splitter1.initialise();
+        splitter2 = new StaticRouter(splitter2Config);
+        splitter2.initialise();
+        service1 = new StaticRouter(service1Config);
+        service1.initialise();
+        service2 = new StaticRouter(service2Config);
+        service2.initialise();
+        service3 = new StaticRouter(service3Config);
+        service3.initialise();
+        aggregator1 = new Aggregator(aggregator1Config);
+        aggregator1.initialise();
+        aggregator2 = new Aggregator(aggregator2Config);
+        aggregator2.initialise();
+        aggregator3 = new Aggregator(aggregator3Config);
+        aggregator3.initialise();
+    }
+
+    protected void tearDown() throws Exception {
+        splitter1.destroy();
+        service1.destroy();
+        service2.destroy();
+        aggregator1.destroy();
+        MockRegistry.uninstall();
+        MockCourierFactory.uninstall();
+    }
+
+    /**
+     *             |------------------ service1 ----------------|
+     *             |                                            |
+     * splitter1 --|                                            |-- aggregator2 --
+     *             |               |--service2--|               |
+     *             |-- splitter2 --|            |--aggregator1--|
+     *                             |--service3--|
+     */
+    public void test_nested_split() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, IOException, SAXException, ParserConfigurationException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+
+        // Manually deliver the message to the splitter1 service...
+        splitter1.process(messageIn);
+        AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+        assertNotNull(service1Message);
+
+        // Manually deliver the message in service1Courier into service1...
+        service1.process(service1Courier.messages.get(0));
+
+        // Manually deliver the message in splitter2Courier into splitter2...
+        splitter2.process(splitter2Courier.messages.get(0));
+        AggregationDetails service2Message = Aggregator.getAggregatorDetails(service2Courier.messages.get(0), 0);
+        assertNotNull(service2Message);
+
+        // Manually deliver the message in aggregator2Courier into aggregator2 (this is the message from service1)...
+        aggregator2.process(aggregator2Courier.messages.get(0));
+
+        // Manually deliver the message in service2Courier into service2...
+        service2.process(service2Courier.messages.get(0));
+
+        // Manually deliver the message in service3Courier into service3...
+        service3.process(service3Courier.messages.get(0));
+
+        // aggregator1Courier should have 2 messages in it... one from service2
+        // and one from service3...
+        assertEquals(2, aggregator1Courier.messages.size());
+
+        // Should be 2 aggrTags before aggregation...
+        List<String> aggrTags = Aggregator.getAggregatorTags(aggregator1Courier.messages.get(1));
+        assertEquals(2, aggrTags.size());
+
+        // Manually deliver the 2 messages to aggregator1...
+        aggregator1.process(aggregator1Courier.messages.get(0));
+        Message aggregator1Message = aggregator1.process(aggregator1Courier.messages.get(1));
+        assertNoAggregationTags(aggregator1Courier.messages.get(0));
+        assertNoAggregationTags(aggregator1Courier.messages.get(1));
+        aggrTags = Aggregator.getAggregatorTags(aggregator1Message);
+        assertEquals(1, aggrTags.size());
+        AggregationDetails aggregator1MessageAggrDetails = Aggregator.getAggregatorDetails(aggregator1Message, 0);
+        assertEquals(service1Message.getSeriesUuid(), aggregator1MessageAggrDetails.getSeriesUuid());
+
+        // Manually deliver the aggregator1Message to aggregator2...
+        Message aggregator2Message = aggregator2.process(aggregator1Message);
+        assertNoAggregationTags(aggregator1Message);
+        assertEquals(2, aggregator2Message.getAttachment().getUnnamedCount());
+        Message message1 = Util.deserialize((Serializable) aggregator2Message.getAttachment().itemAt(0));
+        assertNoAggregationTags(message1);
+        Message message2 = Util.deserialize((Serializable) aggregator2Message.getAttachment().itemAt(1));
+        assertNoAggregationTags(message2);
+    }
+
+    public void test_splitIds() throws ActionProcessingException {
+        // Should be allowed to pump messages with diff splitIds into aggregator2
+        // because there's no splitId set on it...
+        test_splitIds(aggregator2, "splitId-1", "splitId-2");
+
+        // Should get an error if I try the same with aggregator 3, coz it has
+        // a "splitId" set on it...
+        try {
+            test_splitIds(aggregator3, "splitId-x", "splitId-y");
+            fail("Expected ActionProcessingException");
+        } catch (ActionProcessingException e) {
+            assertEquals("Invalid aggregation config on aggregator 'aggregator3-config' .  This aggregator is configured to only aggregate message with an aggregation 'spliId' of 'splitId1'. The splitId on the received message is 'splitId-x'. A nested aggregation point may be missing, or may have been bypassed.", e.getMessage());
+        }
+
+        // The following should work because the splitIds match the split ID on
+        // aggregator3's config (see action-configs-02.xml).
+        test_splitIds(aggregator3, "splitId1", "splitId1");
+
+    }
+
+    private void test_splitIds(Aggregator aggregator, String splitId1, String splitId2) throws ActionProcessingException {
+        Message messageIn = MessageFactory.getInstance().getMessage();
+        AggregationDetails aggrDetails;
+        List<String> aggrTags = new ArrayList<String>();
+
+        aggrDetails = new AggregationDetails("xx", 1, 2, 123123);
+        aggrDetails.setSplitId(splitId1);
+        aggrTags.add(aggrDetails.toString());
+        Aggregator.setAggregatorTags(messageIn, aggrTags);
+        aggregator.process(messageIn);
+
+        aggrDetails = new AggregationDetails("cc", 1, 2, 123123);
+        aggrDetails.setSplitId(splitId1);
+        aggrTags.add(aggrDetails.toString());
+        Aggregator.setAggregatorTags(messageIn, aggrTags);
+        aggregator.process(messageIn);
+
+    }
+
+    private void assertNoAggregationTags(Message aggregator1Message) {
+        List<String> aggrTags;
+        aggrTags = Aggregator.getAggregatorTags(aggregator1Message);
+        assertNull(aggrTags);
+    }
+}
\ No newline at end of file


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregation;
+
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.internal.soa.esb.message.format.xml.MessageImpl;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.tests.XMLMessageUnitTest;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+*/
+class TestCourier extends MockCourier {
+
+    public List<Message> messages = new ArrayList<Message>();
+
+    public TestCourier() {
+        super(true);
+    }
+
+    public boolean deliver(Message message) throws CourierException, MalformedEPRException {
+        try {
+            String xmlRepresentation = XMLMessageUnitTest.msgToXML((MessageImpl)message);
+            messages.add(XMLMessageUnitTest.msgFromXML(xmlRepresentation));
+            return true;
+        } catch(Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+        return false;
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,43 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
+
+    <providers>
+        <bus-provider name="null"><bus busid="null"/></bus-provider>
+    </providers>
+
+    <services>
+        <service category="MessageRouting" name="SplitterService" description="Sends messages to N destinations">
+
+            <listeners>
+                <listener name="null-listener" busidref="null" />
+            </listeners>
+            
+            <actions>
+
+                <action name="splitter1-action" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="service1"/>
+                        <route-to service-category="test" service-name="service2"/>
+                    </property>
+                </action>
+
+                <action name="service1-config" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="aggregator"/>
+                    </property>
+                </action>
+
+                <action name="service2-config" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="aggregator"/>
+                    </property>
+                </action>
+
+                <action name="aggregator-config" class="org.jboss.soa.esb.actions.Aggregator">
+                    <property name="timeoutInMillies" value="2000"/>
+                </action>
+
+            </actions>
+        </service>
+    </services>
+</jbossesb>


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml
___________________________________________________________________
Name: svn:mime-type
   + text/xml
Name: svn:eol-style
   + native

Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml	2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,66 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
+
+    <providers>
+        <bus-provider name="null"><bus busid="null"/></bus-provider>
+    </providers>
+
+    <services>
+        <service category="MessageRouting" name="SplitterService" description="Sends messages to N destinations">
+
+            <listeners>
+                <listener name="null-listener" busidref="null" />
+            </listeners>
+            
+            <actions>
+
+                <action name="splitter1-action" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="service1"/>
+                        <route-to service-category="test" service-name="splitter2"/>
+                    </property>
+                </action>
+
+                <action name="splitter2-action" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="service2"/>
+                        <route-to service-category="test" service-name="service3"/>
+                    </property>
+                </action>
+
+                <action name="service1-config" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="aggregator2"/>
+                    </property>
+                </action>
+
+                <action name="service2-config" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="aggregator1"/>
+                    </property>
+                </action>
+
+                <action name="service3-config" class="org.jboss.soa.esb.actions.StaticRouter">
+                    <property name="destinations">
+                        <route-to service-category="test" service-name="aggregator1"/>
+                    </property>
+                </action>
+
+                <action name="aggregator1-config" class="org.jboss.soa.esb.actions.Aggregator">
+                    <property name="timeoutInMillies" value="200000"/>
+                </action>
+
+                <action name="aggregator2-config" class="org.jboss.soa.esb.actions.Aggregator">
+                    <property name="timeoutInMillies" value="200000"/>
+                </action>
+
+                <!-- Used for splitId checking tests -->
+                <action name="aggregator3-config" class="org.jboss.soa.esb.actions.Aggregator">
+                    <property name="timeoutInMillies" value="200000"/>
+                    <property name="splitId" value="splitId1"/>
+                </action>
+
+            </actions>
+        </service>
+    </services>
+</jbossesb>


Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml
___________________________________________________________________
Name: svn:mime-type
   + text/xml
Name: svn:eol-style
   + native

Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java	2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java	2007-11-21 16:52:43 UTC (rev 16733)
@@ -22,30 +22,19 @@
 
 package org.jboss.soa.esb.message.tests;
 
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.net.URL;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
 import junit.framework.TestCase;
-
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.message.format.xml.MessageImpl;
 import org.jboss.soa.esb.addressing.Call;
 import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.PortReference;
 import org.jboss.soa.esb.addressing.eprs.*;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.body.content.BytesBody;
 import org.jboss.soa.esb.message.format.MessageFactory;
 import org.jboss.soa.esb.message.format.MessageType;
-import org.w3c.dom.Document;
-import org.xml.sax.InputSource;
+import org.jboss.soa.esb.util.Util;
 
-import com.sun.org.apache.xml.internal.serialize.OutputFormat;
-import com.sun.org.apache.xml.internal.serialize.XMLSerializer;
+import java.net.URL;
 
 /**
  * Unit tests for the Class class.
@@ -550,42 +539,12 @@
 	public static String msgToXML(final MessageImpl msg)
 		throws Exception
 	{
-		final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance() ;
-		
-		factory.setNamespaceAware(true);
-		
-		final DocumentBuilder builder = factory.newDocumentBuilder() ;
-		Document doc = builder.newDocument() ;
-		
-		doc = msg.toXML(doc) ;
-		
-		final StringWriter sWriter = new StringWriter() ;
-		final OutputFormat format = new OutputFormat() ;
-		format.setIndenting(true) ;
-
-		final XMLSerializer xmlS = new XMLSerializer(sWriter, format) ;
-
-		xmlS.asDOMSerializer() ;
-		xmlS.serialize(doc) ;
-
-		return sWriter.toString() ;
+		return (String) Util.serialize(msg);
 	}
 	
 	public static MessageImpl msgFromXML(final String xmlRepresentation)
 		throws Exception
 	{
-		final StringReader stringReader = new StringReader(xmlRepresentation) ;
-		final InputSource inputSource = new InputSource(stringReader) ;
-		
-		final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance() ;
-		
-		factory.setNamespaceAware(true);
-		
-		final DocumentBuilder builder = factory.newDocumentBuilder() ;
-		final Document doc = builder.parse(inputSource) ;
-
-		final MessageImpl message = new MessageImpl() ;
-		message.fromXML(doc) ;
-		return message ;
+        return (MessageImpl) Util.deserialize(xmlRepresentation);
 	}
 }




More information about the jboss-svn-commits mailing list