[jboss-svn-commits] JBL Code SVN: r16733 - in labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta: src/org/jboss/soa/esb/client and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Nov 21 11:52:43 EST 2007
Author: tfennelly
Date: 2007-11-21 11:52:43 -0500 (Wed, 21 Nov 2007)
New Revision: 16733
Added:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml
Modified:
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java
labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1204
http://jira.jboss.com/jira/browse/JBESB-1201
http://jira.jboss.com/jira/browse/JBESB-1330
http://jira.jboss.com/jira/browse/JBESB-1331
http://jira.jboss.com/jira/browse/JBESB-746
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,157 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions;
+
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+
+/**
+ * Aggregation Details.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class AggregationDetails {
+
+ private String seriesUuid;
+ private int messageNumber = -1;
+ private int seriesSize = -1;
+ private long seriesTimestamp = -1;
+ private String splitId;
+
+ public AggregationDetails(String seriesUuid, int messageNumber, int seriesSize, long seriesTimestamp) {
+ AssertArgument.isNotNullAndNotEmpty(seriesUuid, "seriesUuid");
+ if(messageNumber < 1) {
+ throw new IllegalArgumentException("Invalid AggregatorDetails. messageNumber < 1");
+ }
+ if(seriesSize < messageNumber) {
+ throw new IllegalArgumentException("Invalid AggregatorDetails. seriesSize < messageNumber");
+ }
+ if(seriesTimestamp < 1) {
+ throw new IllegalArgumentException("Invalid AggregatorDetails. timestamp < 1");
+ }
+
+ this.seriesUuid = seriesUuid;
+ this.messageNumber = messageNumber;
+ this.seriesSize = seriesSize;
+ this.seriesTimestamp = seriesTimestamp;
+ }
+
+ public AggregationDetails(String aggregatorTag) {
+ AssertArgument.isNotNullAndNotEmpty(aggregatorTag, "aggregatorTag");
+ String[] tokens = aggregatorTag.split(":");
+
+ if(tokens.length < 4) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. Must have 4 tokens (colon separated).");
+ }
+
+ this.seriesUuid = tokens[0];
+ if(this.seriesUuid == null || this.seriesUuid.trim().equals("")) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag: seriesUuid is blank.");
+ }
+ try {
+ this.messageNumber = Integer.parseInt(tokens[1]);
+ if(messageNumber < 1) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. messageNumber < 1");
+ }
+ this.seriesSize = Integer.parseInt(tokens[2]);
+ if(seriesSize < messageNumber) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. seriesSize < messageNumber");
+ }
+ this.seriesTimestamp = Long.parseLong(tokens[3]);
+ if(seriesTimestamp < 1) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. timestamp < 1");
+ }
+ if(tokens.length == 5) {
+ splitId = tokens[4];
+ }
+ } catch(NumberFormatException e) {
+ if(this.messageNumber == -1) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. 'messageNumber' must be an int.");
+ } else if(this.seriesSize == -1) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. 'seriesSize' must be an int.");
+ } else if(this.seriesTimestamp == -1) {
+ throw new IllegalArgumentException("Invalid Aggregator Tag. 'timestamp' must be an long.");
+ }
+ }
+ }
+
+ public String getSeriesUuid() {
+ return seriesUuid;
+ }
+
+ public int getSeriesSize() {
+ return seriesSize;
+ }
+
+ public int getMessageNumber() {
+ return messageNumber;
+ }
+
+ public long getSeriesTimestamp() {
+ return seriesTimestamp;
+ }
+
+ public String getSplitId() {
+ return splitId;
+ }
+
+ public void setSplitId(String splitId) {
+ this.splitId = splitId;
+ }
+
+ public boolean equals(Object obj) {
+ if (!(obj instanceof AggregationDetails)) {
+ // also covers null
+ return false;
+ } else if (obj == this) {
+ return true;
+ } else {
+ return toString().equals(obj.toString());
+ }
+ }
+
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public String toString() {
+ assertDetailsSet();
+ if(splitId == null) {
+ return seriesUuid + ":" + messageNumber + ":" + seriesSize + ":" + seriesTimestamp;
+ } else {
+ return seriesUuid + ":" + messageNumber + ":" + seriesSize + ":" + seriesTimestamp + ":" + splitId;
+ }
+ }
+
+ private void assertDetailsSet() {
+ if(seriesUuid == null || seriesUuid.trim().equals("")) {
+ throw new IllegalStateException("AggregationDetails not set: 'seriesUuid' is null or empty.");
+ }
+ if(messageNumber == -1) {
+ throw new IllegalStateException("AggregationDetails not set: 'messageNumber' is not set.");
+ }
+ if(seriesSize == -1) {
+ throw new IllegalStateException("AggregationDetails not set: 'seriesSize' is not set.");
+ }
+ if(seriesTimestamp == -1) {
+ throw new IllegalStateException("AggregationDetails not set: 'timestamp' is not set.");
+ }
+ }
+}
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/AggregationDetails.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java 2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -27,32 +27,28 @@
*/
package org.jboss.soa.esb.actions;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.Service;
-import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.client.ServiceInvoker;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
+import static org.jboss.soa.esb.client.ServiceInvoker.DEAD_LETTER_SERVICE_NAME;
+import static org.jboss.soa.esb.client.ServiceInvoker.INTERNAL_SERVICE_CATEGORY;
import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
-import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.util.Util;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* Simple Aggregator. The aggregator relies on 'aggregatorTags'. To puzzle the individual
* back together. The aggregatorTag is set in the MessageRouter.deliverAsync() method. The aggregator
@@ -73,26 +69,24 @@
public final static String AGGEGRATOR_TAG = "aggregatorTag";
public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
- private ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
- = new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
+ private Map<String, Map< Integer, Message > > aggregatedMessageMap
+ = new ConcurrentHashMap< String, Map< Integer, Message > >();
private TimeoutChecker _timeoutChecker=null;
- private ArrayList<String> _notified = new ArrayList<String>();
protected ConfigTree config;
private Logger logger = Logger.getLogger(Aggregator.class);
private Long timeoutInMillies=null;
- private String serviceName;
- private String serviceCategoryName;
- private Registry registry;
-
- private Aggregator(){}
+ private Set<String> receivedSplits = new HashSet<String>();
+ private String splitId;
+ private Aggregator(){}
+
public Aggregator(ConfigTree config) throws ConfigurationException, RegistryException
- {
-
+ {
this.config = config;
- checkMyParms();
- registry = RegistryFactory.getRegistry();
+ timeoutInMillies = Long.valueOf(config.getRequiredAttribute("timeoutInMillies"));
+ logger.debug("Aggregator config: timeoutInMillies=" + timeoutInMillies);
+ splitId = config.getAttribute("splitId");
}
/**
@@ -110,13 +104,17 @@
_timeoutChecker.start();
}
- /**
- * Destroy the action instance.
- * <p/>
- * This method is called prior to the release of the action instance. All
- * resources associated with this action instance should be released as the
- * instance will no longer be used.
- */
+ public Map<String, Map<Integer, Message>> getAggregatedMessageMap() {
+ return aggregatedMessageMap;
+ }
+
+ /**
+ * Destroy the action instance.
+ * <p/>
+ * This method is called prior to the release of the action instance. All
+ * resources associated with this action instance should be released as the
+ * instance will no longer be used.
+ */
public void destroy()
throws ActionLifecycleException
{
@@ -137,82 +135,135 @@
@SuppressWarnings("unchecked")
public Message process(Message message) throws ActionProcessingException
{
- ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
-
- if (aggregatorTags!=null && aggregatorTags.size()>0) {
- String aggregatorTag = (String) aggregatorTags.get(aggregatorTags.size()-1);
- //Removing the last tags and setting them as "the one in current use"
- if (aggregatorTags.size()>1) {
- aggregatorTags.remove(aggregatorTags.size()-1);
- message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
- } else {
- message.getProperties().remove(Aggregator.AGGEGRATOR_TAG);
- }
- String[] tag = aggregatorTag.split(":");
- String uuId = tag[0];
- String messageNumber = tag[1];
- int totalNumberOfMessages = Integer.valueOf(tag[2]).intValue();
- Long splitterTimeStamp = Long.valueOf(tag[3]);
- message.getProperties().setProperty(SPLITTER_TIME_STAMP, splitterTimeStamp);
+ List<String> aggregatorTags = getAggregatorTags(message);
- if (isTimedOut(message)) {
- if (_aggregatedMessageMap.containsKey(uuId)) {
- ConcurrentHashMap<String, Message> messageMap = _aggregatedMessageMap.get(uuId);
+ if (aggregatorTags != null && aggregatorTags.size() > 0) {
+ String aggregatorTag = aggregatorTags.get(aggregatorTags.size()-1);
+ AggregationDetails aggrDetails = new AggregationDetails(aggregatorTag);
+
+ assertAggregationDetailsOK(aggrDetails);
+
+ // Set the timestamp on the message... used by the TimeoutChecker...
+ message.getProperties().setProperty(SPLITTER_TIME_STAMP, aggrDetails.getSeriesTimestamp());
+
+ Map<Integer, Message> messageMap = aggregatedMessageMap.get(aggrDetails.getSeriesUuid());
+ if (isTimedOut(aggrDetails.getSeriesTimestamp())) {
+ if (messageMap != null) {
//add the message in if we don't already have it
- if (!messageMap.containsKey(uuId)) {
- messageMap.put(messageNumber, message);
+ if (!messageMap.containsKey(aggrDetails.getMessageNumber())) {
+ messageMap.put(aggrDetails.getMessageNumber(), message);
}
//Just going to send out what we have this far, which will remove this key
//so subsequent messages with this uuId will be ignored
- message = createAggregateMessage(uuId, _aggregatedMessageMap.get(uuId));
+ message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
} else {
logger.debug("Ignoring this message since we are already timedout on this message.");
//ignoring this message
message = null;
}
} else {
- ConcurrentHashMap<String, Message> messageMap=null;
//Get the messageMap with this uuId, or create it if not found
- if (_aggregatedMessageMap.containsKey(uuId)) {
- messageMap = _aggregatedMessageMap.get(uuId);
- } else {
- messageMap = new ConcurrentHashMap<String, Message>();
+ if (messageMap == null) {
+ messageMap = new ConcurrentHashMap<Integer, Message>();
+ aggregatedMessageMap.put(aggrDetails.getSeriesUuid(), messageMap);
}
- if (messageMap.containsKey(messageNumber)) {
+ if (messageMap.containsKey(aggrDetails.getMessageNumber())) {
logger.warn("Received duplicate message, ignoring it but this should not happen.");
} else {
- messageMap.put(messageNumber, message);
- _aggregatedMessageMap.put(uuId, messageMap);
- }
- if (messageMap.size()==totalNumberOfMessages) {
- message = createAggregateMessage(uuId, messageMap);
+ messageMap.put(aggrDetails.getMessageNumber(), message);
+ }
+ if (messageMap.size() == aggrDetails.getSeriesSize()) {
+ message = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
} else {
message = null;
}
}
} else {
- throw new ActionProcessingException("Could not find an aggregator tag, so this message can not be aggregated.");
+ // Just aggregate the single message...
+ message = createAggregateMessage(message);
}
- return message;
+
+ return message;
}
- /**
- * Obtains the timeoutInMillies to a number
- *
- * @throws ConfigurationException
- */
- private void checkMyParms() throws ConfigurationException
- {
- timeoutInMillies = Long.valueOf(config.getAttribute("timeoutInMillies"));
- logger.debug("Aggregator config: timeoutInMillies=" + timeoutInMillies);
- serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+
+ private void assertAggregationDetailsOK(AggregationDetails aggrDetails) throws ActionProcessingException {
+ if(splitId != null) {
+ if(!splitId.equals(aggrDetails.getSplitId())) {
+ throw new ActionProcessingException("Invalid aggregation config on aggregator '" + config.getAttribute("action") + "' . " +
+ "This aggregator is configured " +
+ "to only aggregate message with an aggregation 'spliId' of '" + splitId + "'. " +
+ "The splitId on the received message is '" + aggrDetails.getSplitId() + "'. " +
+ "A nested aggregation point may be missing, or may have been bypassed.");
+ }
+ } else {
+ // log a warning if this aggregator starts receiving messages with
+ // different splitIds. This would suggest an error in the nested split configuration.
+ if(!receivedSplits.contains(aggrDetails.getSplitId())) {
+ receivedSplits.add(aggrDetails.getSplitId());
+ }
+
+ if(receivedSplits.size() > 1) {
+ logger.warn("Aggregator action '" + config.getAttribute("action") + "' has received " +
+ "split messages from more multiple 'splitId' sources: " + receivedSplits + "\n" +
+ "You may need to configure an intermediate/nested aggregator at " +
+ "some point in the message flow.");
+ }
+
+ }
}
+ public static List<String> getAggregatorTags(Message message) {
+ return (List<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
+ }
+
+ public static void setAggregatorTags(Message message, List<String> tags) {
+ if(tags != null) {
+ message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, tags);
+ } else {
+ message.getProperties().remove(Aggregator.AGGEGRATOR_TAG);
+ }
+ }
+
+ public static AggregationDetails getAggregatorDetails(Message message, int tagIndex) throws ActionProcessingException {
+ List<String> tags = getAggregatorTags(message);
+
+ if(tags == null || tags.isEmpty()) {
+ return null;
+ } else {
+ return new AggregationDetails(tags.get(tagIndex));
+ }
+ }
+
public static void decorate(Message message) {
}
/**
+ * Aggregates a single message into an aggregated message.
+ * <p/>
+ * This method is called for messages that are recived without aggregation tags.
+ *
+ * @param message
+ * @return the aggregated message
+ */
+ @SuppressWarnings("unchecked")
+ public Message createAggregateMessage(Message message) throws ActionProcessingException {
+ // Create an aggregated message
+ Message aggregatedMessage = MessageFactory.getInstance().getMessage();
+
+ setAggregatorTags(message, null);
+ try {
+ aggregatedMessage.getAttachment().addItem(Util.serialize(message));
+ } catch (ParserConfigurationException e) {
+ throw new ActionProcessingException("Message attachment serialization failure", e);
+ } catch (IOException e) {
+ throw new ActionProcessingException("Message attachment serialization failure", e);
+ }
+
+ return aggregatedMessage;
+ }
+
+ /**
* Aggregates the messages into 1 new message with an attachment for each message.
*
* @param uuId
@@ -220,45 +271,65 @@
* @return the aggregated message
*/
@SuppressWarnings("unchecked")
- private Message createAggregateMessage(String uuId, ConcurrentHashMap<String, Message> messageMap)
- {
- //Create an aggregated message
+ public Message createAggregateMessage(String uuId, Map<Integer, Message> messageMap) throws ActionProcessingException {
+ // Create an aggregated message
Message aggregatedMessage = MessageFactory.getInstance().getMessage();
- boolean isFirstTime=true;
- for (Message message : messageMap.values()) {
- //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
- //Only need to get it from the first message, should be the same for the others.
- if (isFirstTime) {
- ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
- if (aggregatorTags!=null && aggregatorTags.size()>0) {
- aggregatedMessage.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
- }
- isFirstTime=false;
+
+ //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
+ //Only need to get it from the first message, should be the same for the others.
+ List<String> aggregatedMessageTags = copyAggregationTags(messageMap);
+ setAggregatorTags(aggregatedMessage, aggregatedMessageTags);
+
+ for (Message attachmentMessage : messageMap.values()) {
+ //Add the individual messages as attachments
+ try {
+ // Clear the aggregation tags from the attachment message. Any future aggregation
+ // on the payload of these messages should be done within the context of the
+ // outer/aggregated message and it's tags.
+ setAggregatorTags(attachmentMessage, null);
+ aggregatedMessage.getAttachment().addItem(Util.serialize(attachmentMessage));
+ } catch (ParserConfigurationException e) {
+ throw new ActionProcessingException("Message attachment serialization failure", e);
+ } catch (IOException e) {
+ throw new ActionProcessingException("Message attachment serialization failure", e);
}
- //Add the individual messages as attachments
- aggregatedMessage.getAttachment().addItem(message);
}
- _aggregatedMessageMap.remove(uuId);
+ aggregatedMessageMap.remove(uuId);
//TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
//remove from the notificationMap if it is in there.
- _notified.remove(uuId);
return aggregatedMessage;
}
+
+ private List<String> copyAggregationTags(Map<Integer, Message> messageMap) {
+ // Get the tags from the first message...
+ List<String> nestedAggregationTags = getAggregatorTags(messageMap.values().iterator().next());
+
+ if(nestedAggregationTags != null && nestedAggregationTags.size() > 1) {
+ // clone the tags, just incase they get zapped from the source list...
+ List<String> aggregatedMessageTags = new ArrayList<String>();
+
+ aggregatedMessageTags.addAll(nestedAggregationTags);
+ // remove the last one because that's related to the nested aggregation...
+ aggregatedMessageTags.remove(aggregatedMessageTags.size() - 1);
+
+ return aggregatedMessageTags;
+ }
+
+ return null;
+ }
+
/**
* If the aggregation process is complete then return true. This depends on the configuration.
*
- * @param totalNumberOfMessages
- * @param splitterTimeStamp
- * @param messageMap
- * @return
+ * @param splitterTimeStamp The splitter timestamp.
+ * @return True if the message is timed out, otherwise false.
*/
- private boolean isTimedOut(Message message)
+ private boolean isTimedOut(long splitterTimeStamp)
{
- long splitterTimeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
if (timeoutInMillies!=null) {
long now = new Date().getTime();
- long expiration = splitterTimeStamp + Long.valueOf(timeoutInMillies);
+ long expiration = splitterTimeStamp + timeoutInMillies;
if (logger.isDebugEnabled()) {
DateFormat dateFormat = DateFormat.getTimeInstance();
logger.debug("Current time=" + dateFormat.format(new Date(now))
@@ -274,7 +345,8 @@
return false;
}
/**
- * Checks for message that are timed out. If we find that one we notify ourselves about it by resending the message.
+ * Checks for message that are timed out. If we find that one, drop it in the DLQ and delete
+ * it from the map.
*
* @author kstam
*
@@ -282,39 +354,43 @@
class TimeoutChecker extends Thread {
private final Lock terminateLock = new ReentrantLock() ;
private final Condition terminateCondition = terminateLock.newCondition() ;
- private boolean terminated ;
-
+ private boolean terminated ;
+ ServiceInvoker dlQueueInvoker;
+
@SuppressWarnings("unchecked")
public void run() {
+ try {
+ dlQueueInvoker = new ServiceInvoker(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
+ } catch (MessageDeliverException e) {
+ logger.error("Unable to initialise Dead Letter Channel Service Invoker for Aggregation timeout checker. Not using Dead Letter Channel.", e);
+ }
+
boolean running = true ;
while(running) {
- Service theService = new Service(serviceCategoryName, serviceName);
- ServiceInvoker theServiceInvoker = null;
-
//no need to check if no timeout is set
if (timeoutInMillies!=null) {
- for (ConcurrentHashMap< String, Message > messageMap : _aggregatedMessageMap.values()) {
+ for (Map<Integer, Message > messageMap : aggregatedMessageMap.values()) {
//Check the first message, they all have the same time stamp
Message message = messageMap.values().iterator().next();
- if (isTimedOut(message)) {
+ long timeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
+
+ if (isTimedOut(timeStamp)) {
//We found a timed-out message. Let's go notify ourselves about by resending a message,
//it if we haven't done so already
- ArrayList<String> aggregatorTag = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
- String uuId = aggregatorTag.get(0);
- if (!_notified.contains(uuId)) {
- _notified.add(uuId);
- logger.debug("Found timeout message.");
+ List<String> aggregatorTag = getAggregatorTags(message);
+ if(aggregatorTag != null && !aggregatorTag.isEmpty()) {
+ AggregationDetails aggrDetails = new AggregationDetails(aggregatorTag.get(aggregatorTag.size() - 1));
+
try {
- if (theServiceInvoker == null)
- theServiceInvoker = new ServiceInvoker(theService);
-
- theServiceInvoker.deliverAsync(message);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- //If we can't notify then drop this data
- logger.debug("Deleting data for message series with uuId=" + uuId);
- _notified.remove(uuId);
- _aggregatedMessageMap.remove(uuId);
+ logger.info("Deleting message aggregation series: " + aggrDetails.getSeriesUuid());
+ if(dlQueueInvoker != null) {
+ Message aggregateMessage = createAggregateMessage(aggrDetails.getSeriesUuid(), messageMap);
+ dlQueueInvoker.deliverAsync(aggregateMessage);
+ }
+ } catch(Throwable e) {
+ logger.error("Error delivering timed out aggregation message to Dead Letter Service.", e);
+ } finally {
+ aggregatedMessageMap.remove(aggrDetails.getSeriesUuid());
}
}
}
@@ -332,7 +408,7 @@
}
}
}
-
+
public void terminate() {
terminateLock.lock() ;
try
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java 2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -31,6 +31,7 @@
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.client.MessageMulticaster;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
@@ -68,6 +69,8 @@
*/
public void initialise() throws ActionLifecycleException
{
+ messageMulticaster = new MessageMulticaster(_config.getAttribute("action", "%unset%"));
+
ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
if (null == destList || destList.length < 1)
{
@@ -92,7 +95,7 @@
protected ConfigTree _config;
- protected org.jboss.soa.esb.client.MessageMulticaster messageMulticaster = new org.jboss.soa.esb.client.MessageMulticaster();
+ protected MessageMulticaster messageMulticaster;
protected static Logger _logger = Logger.getLogger(StaticWiretap.class);
}
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java 2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/src/org/jboss/soa/esb/client/MessageMulticaster.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -19,13 +19,14 @@
*/
package org.jboss.soa.esb.client;
-import org.jboss.soa.esb.message.Message;
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.actions.AggregationDetails;
import org.jboss.soa.esb.actions.Aggregator;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.internal.soa.esb.assertion.AssertArgument;
-import org.apache.log4j.Logger;
import java.util.*;
@@ -48,8 +49,24 @@
private static Logger logger = Logger.getLogger(MessageMulticaster.class);
private Map<Service, ServiceInvoker> invokers = new LinkedHashMap<Service, ServiceInvoker>();
+ private String splitId;
/**
+ * Public default constructor.
+ */
+ public MessageMulticaster() {
+ }
+
+ /**
+ * Public default constructor.
+ * @param splitId Split ID for this multicaster.
+ */
+ public MessageMulticaster(String splitId) {
+ AssertArgument.isNotNullAndNotEmpty(splitId, "splitId");
+ this.splitId = splitId;
+ }
+
+ /**
* Add a message recipient Service.
* @param service Recipient service for receipt of messages from this miltcaster instance.
* @throws RegistryException Failed to lookup Service endpoint.
@@ -102,26 +119,23 @@
* @throws MessageDeliverException Failed to deliver message to endpoint.
*/
public void sendToSubset(Message message, List<Service> recipients) throws RegistryException, MessageDeliverException {
- if (recipients.isEmpty())
- logger.warn("MessageMulticaster.sendToSubset: empty recipients list!");
- else
- {
- String uuId = UUID.randomUUID().toString();
- ArrayList<String> aggregatorTags = new ArrayList<String>();
+ if (recipients.isEmpty()) {
+ logger.warn("MessageMulticaster.sendToSubset: empty recipients list!");
+ } else{
+ String seriesUUID = UUID.randomUUID().toString();
+ long seriesTimestamp = System.currentTimeMillis();
int recipientCount = recipients.size();
- long timestamp = System.currentTimeMillis();
-
+
for(int i = 0; i < recipientCount; i++) {
Service recipient = recipients.get(i);
ServiceInvoker invoker = getInvoker(recipient);
- String tag = uuId + ":" + (i + 1) + ":" + recipientCount + ":" + timestamp;
-
- aggregatorTags.add(tag);
- message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
- if (logger.isDebugEnabled()) {
- logger.debug(Aggregator. AGGEGRATOR_TAG + "=" + tag);
+
+ if(recipientCount > 1) {
+ // Only add aggregation info if we're splitting the message i.e. sending it to
+ // more than 1 recipient...
+ addAggregationDetails(message, seriesUUID, recipientCount, seriesTimestamp, i + 1);
}
-
+
if(invoker == null) {
logger.error("Service '" + recipient + "' is not in recipient list. Delivering message to Dead Letter Channel.");
ServiceInvoker.deliverToDeadLetterService(message);
@@ -134,10 +148,33 @@
}
}
}
- }
+ }
}
- private org.jboss.soa.esb.client.ServiceInvoker getInvoker(Service recipient) throws RegistryException, MessageDeliverException {
+ private void addAggregationDetails(Message message, String uuId, int recipientCount, long seriesTimestamp, int messageIndex) {
+ AggregationDetails aggrDetails = new AggregationDetails(uuId, messageIndex, recipientCount, seriesTimestamp);
+ ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(Aggregator.AGGEGRATOR_TAG);
+
+ // This is useful during aggregation - as a way of id'ing where the split occurred.
+ aggrDetails.setSplitId(splitId);
+
+ if(aggregatorTags == null) {
+ aggregatorTags = new ArrayList<String>();
+ message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
+ }
+
+ if(messageIndex > 1) {
+ // remove the tag string for the last recipient...
+ aggregatorTags.remove(aggregatorTags.size() - 1);
+ }
+ aggregatorTags.add(aggrDetails.toString());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(Aggregator. AGGEGRATOR_TAG + "=" + aggrDetails);
+ }
+ }
+
+ private ServiceInvoker getInvoker(Service recipient) throws RegistryException, MessageDeliverException {
ServiceInvoker invoker = invokers.get(recipient);
// We lazilly create the invokers...
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions;
+
+import junit.framework.TestCase;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class AggregationDetailsUnitTest extends TestCase {
+
+ public void test_multi_args() throws MessageDeliverException {
+ test_invalid_multi_args("", 1, 1, 1, "null or empty 'seriesUuid' arg in method call.");
+ test_invalid_multi_args("x", -1, 1, 1, "Invalid AggregatorDetails. messageNumber < 1");
+ test_invalid_multi_args("x", 1, -1, 1, "Invalid AggregatorDetails. seriesSize < messageNumber");
+ test_invalid_multi_args("x", 1, 1, -1, "Invalid AggregatorDetails. timestamp < 1");
+
+ // This should work...
+ new AggregationDetails("x", 1, 1, 1);
+ }
+
+ private void test_invalid_multi_args(String seriesUuid, int messageNumber, int seriesSize, long seriesTimestamp, String errorMessage) {
+ try {
+ new AggregationDetails(seriesUuid, messageNumber, seriesSize, seriesTimestamp);
+ fail("Expected IllegalArgumentException/MessageDeliverException");
+ } catch(IllegalArgumentException e) {
+ assertEquals(errorMessage, e.getMessage());
+ }
+ }
+
+ public void test_tag_arg() throws MessageDeliverException {
+ test_invalid_tag_arg("", "null or empty 'aggregatorTag' arg in method call.");
+ test_invalid_tag_arg("1:1:1", "Invalid Aggregator Tag. Must have 4 tokens (colon separated).");
+ test_invalid_tag_arg(":1:1:1", "Invalid Aggregator Tag: seriesUuid is blank.");
+ test_invalid_tag_arg("xxx:x:1:1", "Invalid Aggregator Tag. 'messageNumber' must be an int.");
+ test_invalid_tag_arg("xxx:1:x:1", "Invalid Aggregator Tag. 'seriesSize' must be an int.");
+ test_invalid_tag_arg("xxx:1:1:x", "Invalid Aggregator Tag. 'timestamp' must be an long.");
+ test_invalid_tag_arg("xxx:-1:1:1", "Invalid Aggregator Tag. messageNumber < 1");
+ test_invalid_tag_arg("xxx:1:-1:1", "Invalid Aggregator Tag. seriesSize < messageNumber");
+ test_invalid_tag_arg("xxx:1:1:-1", "Invalid Aggregator Tag. timestamp < 1");
+
+ // This should work...
+ new AggregationDetails("xxx:1:1:1");
+ }
+
+ private void test_invalid_tag_arg(String tag, String errorMessage) {
+ try {
+ new AggregationDetails(tag);
+ fail("Expected IllegalArgumentException/MessageDeliverException");
+ } catch(IllegalArgumentException e) {
+ assertEquals(errorMessage, e.getMessage());
+ }
+ }
+}
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/AggregationDetailsUnitTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,280 @@
+/*
+ Milyn - Copyright (C) 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License (version 2.1) as published by the Free Software
+ Foundation.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+ See the GNU Lesser General Public License for more details:
+ http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.soa.esb.actions.aggregation;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.testutils.ESBConfigUtil;
+
+import java.util.Map;
+
+/**
+ * Tests for JBESB-1201 re timeout management.
+ * <p/>
+ * Make sure the message aggregation info flows in the following scenario...
+ * <pre>
+ *
+ * |----- service1 -----|
+ * | |
+ * splitter --| |-- aggregator
+ * | |
+ * |----- service2 -----|
+ *
+ * </pre>
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class JBESB_1201_UnitTest extends TestCase {
+
+ private TestCourier service1Courier;
+ private TestCourier service2Courier;
+ private TestCourier aggregatorCourier;
+ private TestCourier dlqServiceCourier;
+
+ private StaticRouter splitter;
+ private StaticRouter service1;
+ private StaticRouter service2;
+ private Aggregator aggregator;
+
+ protected void setUp() throws Exception {
+ MockCourierFactory.install();
+ MockRegistry.install();
+
+ service1Courier = new TestCourier();
+ service2Courier = new TestCourier();
+ aggregatorCourier = new TestCourier();
+ dlqServiceCourier = new TestCourier();
+
+ MockRegistry.register("test", "service1", service1Courier);
+ MockRegistry.register("test", "service2", service2Courier);
+ MockRegistry.register("test", "aggregator", aggregatorCourier);
+ MockRegistry.register(ServiceInvoker.INTERNAL_SERVICE_CATEGORY, ServiceInvoker.DEAD_LETTER_SERVICE_NAME, dlqServiceCourier);
+
+ initaliseServices();
+ }
+
+ private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException {
+ ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-01.xml"));
+ ConfigTree splitterConfig = esbConfig.getActionConfig("null-listener", "splitter1-action");
+ ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
+ ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
+ ConfigTree aggregatorConfig = esbConfig.getActionConfig("null-listener", "aggregator-config");
+
+ splitter = new StaticRouter(splitterConfig);
+ splitter.initialise();
+ service1 = new StaticRouter(service1Config);
+ service1.initialise();
+ service2 = new StaticRouter(service2Config);
+ service2.initialise();
+ aggregator = new Aggregator(aggregatorConfig);
+ aggregator.initialise();
+ }
+
+ protected void tearDown() throws Exception {
+ splitter.destroy();
+ service1.destroy();
+ service2.destroy();
+ aggregator.destroy();
+ MockRegistry.uninstall();
+ MockCourierFactory.uninstall();
+ }
+
+ public void test_not_timed_out() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+
+ // Get the aggregators message map and make sure it's empty...
+ Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Manually deliver the message to the splitter service...
+ splitter.process(messageIn);
+ AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+ assertNotNull(service1Message);
+
+ // Manually deliver the message in service1Courier into service1...
+ service1.process(service1Courier.messages.get(0));
+
+ // Manually deliver the message in service2Courier into service2...
+ service2.process(service2Courier.messages.get(0));
+
+ // 2 messages should arrive at the aggregatorCourier...
+ assertEquals(2, aggregatorCourier.messages.size());
+
+ // Aggregators message map should be empty...
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Pump the 1st message into the aggregater...
+ aggregator.process(aggregatorCourier.messages.get(0));
+ assertService1MessageDetailsOK(aggrMessageMap, service1Message);
+
+ // Pump the 2nd (last) message into the aggregater...
+ Message aggregateMessage = aggregator.process(aggregatorCourier.messages.get(1));
+
+ assertNotNull(aggregateMessage);
+ assertEquals(2, aggregateMessage.getAttachment().getUnnamedCount());
+
+ // Make sure the aggregators message map is empty again...
+ assertTrue(aggrMessageMap.isEmpty());
+ }
+
+ public void test_timed_out_non_delivered() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+
+ // Get the aggregators message map and make sure it's empty...
+ Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Manually deliver the message to the splitter service...
+ splitter.process(messageIn);
+ AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+ assertNotNull(service1Message);
+
+ // The aggregator timeout is 2000ms... sleep here for 3000ms to force all
+ // messages delivered to the aggregator to be timed out - should all be
+ // ignored...
+ Thread.sleep(3000);
+
+ // Manually deliver the message in service1Courier into service1...
+ service1.process(service1Courier.messages.get(0));
+
+ // Manually deliver the message in service2Courier into service2...
+ service2.process(service2Courier.messages.get(0));
+
+ // 2 messages should arrive at the aggregatorCourier...
+ assertEquals(2, aggregatorCourier.messages.size());
+
+ // Aggregators message map should be empty...
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Pump the 1st message into the aggregater...
+ aggregator.process(aggregatorCourier.messages.get(0));
+
+ // Aggregators message map should still be empty because the message should
+ // have been ignored...
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Pump the 2nd (last) message into the aggregater...
+ aggregator.process(aggregatorCourier.messages.get(1));
+
+ // Aggregators message map should still be empty because the message should
+ // have been ignored...
+ assertTrue(aggrMessageMap.isEmpty());
+ }
+
+ public void test_timed_out_some_delivered() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+
+ // Get the aggregators message map and make sure it's empty...
+ Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Manually deliver the message to the splitter service...
+ splitter.process(messageIn);
+ AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+ assertNotNull(service1Message);
+
+ // Manually deliver the message in service1Courier into service1...
+ service1.process(service1Courier.messages.get(0));
+
+ // 1 messages should arrive at the aggregatorCourier...
+ assertEquals(1, aggregatorCourier.messages.size());
+
+ // Aggregators message map should be empty...
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Pump the 1st message into the aggregater and make sure it's
+ // details are added properly to the map...
+ aggregator.process(aggregatorCourier.messages.get(0));
+ assertService1MessageDetailsOK(aggrMessageMap, service1Message);
+
+ // The deadServiceCourier should be empty...
+ assertEquals(0, dlqServiceCourier.messages.size());
+
+ // The aggregator timeout is 2000ms... sleep here for 3000ms to force the 2nd
+ // message delivered to the aggregator to be timed out...
+ Thread.sleep(3000);
+
+ // Should have timed out and "notified" on the first message...
+ assertEquals(1, dlqServiceCourier.messages.size());
+ // Aggregators message map should be empty again...
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Manually deliver the message in service2Courier into service2...
+ service2.process(service2Courier.messages.get(0));
+
+ // 2 messages should be at the aggregatorCourier...
+ assertEquals(2, aggregatorCourier.messages.size());
+
+ // Pump the 2nd message into the aggregater...
+ Message aggregateMessage = aggregator.process(aggregatorCourier.messages.get(1));
+ assertNull(aggregateMessage);
+
+ // Aggregators message map should be empty because the message should
+ // have been ignored...
+ assertTrue(aggrMessageMap.isEmpty());
+ }
+
+ public void test_timeoutchecker() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+
+ // Get the aggregators message map and make sure it's empty...
+ Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Manually deliver the message to the splitter service...
+ splitter.process(messageIn);
+ AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+ assertNotNull(service1Message);
+
+ // Manually deliver the message in service1Courier into service1...
+ service1.process(service1Courier.messages.get(0));
+
+ // 1 messages should arrive at the aggregatorCourier...
+ assertEquals(1, aggregatorCourier.messages.size());
+
+ // Aggregators message map should be empty...
+ assertTrue(aggrMessageMap.isEmpty());
+
+ // Pump the message into the aggregater and make sure it's
+ // details are added properly to the map...
+ aggregator.process(aggregatorCourier.messages.get(0));
+ assertService1MessageDetailsOK(aggrMessageMap, service1Message);
+
+ // The aggregator timeout is 2000ms... sleep here for 3000ms...
+ Thread.sleep(4000);
+
+ // Aggregators message map should be empty because the message should
+ // have timed out...
+ assertTrue("Message didn't get removed from map after timeout", aggrMessageMap.isEmpty());
+ }
+
+ private void assertService1MessageDetailsOK(Map<String, Map<Integer, Message>> aggrMessageMap, AggregationDetails service1Message) {
+ // Aggregators message map should have 1 entry for the above message...
+ assertEquals(1, aggrMessageMap.size());
+ Map<Integer, Message> messageSeries = aggrMessageMap.get(service1Message.getSeriesUuid());
+ assertNotNull(messageSeries);
+ assertTrue(messageSeries.get(service1Message.getMessageNumber()) != null);
+ }
+}
\ No newline at end of file
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1201_UnitTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,168 @@
+/*
+ Milyn - Copyright (C) 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License (version 2.1) as published by the Free Software
+ Foundation.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+ See the GNU Lesser General Public License for more details:
+ http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.soa.esb.actions.aggregation;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.testutils.ESBConfigUtil;
+
+import java.util.List;
+
+/**
+ * Tests for JBESB-1204 and JBESB-1331 re aggrgation tag infos.
+ * <p/>
+ * Make sure the message aggregation info flows in the following scenario...
+ * <pre>
+ *
+ * |----- service1 -----|
+ * | |
+ * splitter --| |-- aggregator
+ * | |
+ * |----- service2 -----|
+ *
+ * </pre>
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class JBESB_1204_1331_UnitTest extends TestCase {
+
+ private TestCourier service1Courier;
+ private TestCourier service2Courier;
+ private TestCourier aggregatorCourier;
+
+ private StaticRouter splitter;
+ private StaticRouter service1;
+ private StaticRouter service2;
+ private Aggregator aggregator;
+
+ protected void setUp() throws Exception {
+ MockCourierFactory.install();
+ MockRegistry.install();
+
+ service1Courier = new TestCourier();
+ service2Courier = new TestCourier();
+ aggregatorCourier = new TestCourier();
+
+ MockRegistry.register("test", "service1", service1Courier);
+ MockRegistry.register("test", "service2", service2Courier);
+ MockRegistry.register("test", "aggregator", aggregatorCourier);
+
+ initaliseServices();
+ }
+
+ private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException {
+ ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-01.xml"));
+ ConfigTree splitterConfig = esbConfig.getActionConfig("null-listener", "splitter1-action");
+ ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
+ ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
+ ConfigTree aggregatorConfig = esbConfig.getActionConfig("null-listener", "aggregator-config");
+
+ splitter = new StaticRouter(splitterConfig);
+ splitter.initialise();
+ service1 = new StaticRouter(service1Config);
+ service1.initialise();
+ service2 = new StaticRouter(service2Config);
+ service2.initialise();
+ aggregator = new Aggregator(aggregatorConfig);
+ aggregator.initialise();
+ }
+
+ protected void tearDown() throws Exception {
+ splitter.destroy();
+ service1.destroy();
+ service2.destroy();
+ aggregator.destroy();
+ MockRegistry.uninstall();
+ MockCourierFactory.uninstall();
+ }
+
+ public void test() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+ List<String> aggrTags;
+ AggregationDetails aggrDetailsS1;
+ AggregationDetails aggrDetailsS2;
+ AggregationDetails aggrDetailsAggrM1;
+ AggregationDetails aggrDetailsAggrM2;
+
+ // Manually deliver the message to the splitter service...
+ splitter.process(messageIn);
+
+ // There should be 2 msssages delivered.. one to service1 and one service2...
+ assertEquals(1, service1Courier.messages.size());
+ assertEquals(1, service2Courier.messages.size());
+
+ // The 2 messages should each have 1 aggregation info string (no more)...
+ aggrTags = Aggregator.getAggregatorTags(service1Courier.messages.get(0));
+ assertEquals(1, aggrTags.size());
+ aggrDetailsS1 = new AggregationDetails(aggrTags.get(0));
+ aggrTags = Aggregator.getAggregatorTags(service2Courier.messages.get(0));
+ assertEquals(1, aggrTags.size());
+ aggrDetailsS2 = new AggregationDetails(aggrTags.get(0));
+
+ // Manually deliver the message in service1Courier into service1...
+ service1.process(service1Courier.messages.get(0));
+
+ // Manually deliver the message in service2Courier into service2...
+ service2.process(service2Courier.messages.get(0));
+
+ // 2 messages should arrive at the aggregatorCourier...
+ assertEquals(2, aggregatorCourier.messages.size());
+
+ // Extract aggr details from first message...
+ aggrTags = Aggregator.getAggregatorTags(aggregatorCourier.messages.get(0));
+ assertEquals(1, aggrTags.size());
+ aggrDetailsAggrM1 = new AggregationDetails(aggrTags.get(0));
+
+ // Extract aggr details from second message...
+ aggrTags = Aggregator.getAggregatorTags(aggregatorCourier.messages.get(1));
+ assertEquals(1, aggrTags.size());
+ aggrDetailsAggrM2 = new AggregationDetails(aggrTags.get(0));
+
+ // make sure all the UUIDs match...
+ assertEquals(aggrDetailsS1.getSeriesUuid(), aggrDetailsS2.getSeriesUuid());
+ assertEquals(aggrDetailsS2.getSeriesUuid(), aggrDetailsAggrM1.getSeriesUuid());
+ assertEquals(aggrDetailsAggrM1.getSeriesUuid(), aggrDetailsAggrM2.getSeriesUuid());
+
+ // make sure all the timestamps match...
+ assertEquals(aggrDetailsS1.getSeriesTimestamp(), aggrDetailsS2.getSeriesTimestamp());
+ assertEquals(aggrDetailsS2.getSeriesTimestamp(), aggrDetailsAggrM1.getSeriesTimestamp());
+ assertEquals(aggrDetailsAggrM1.getSeriesTimestamp(), aggrDetailsAggrM2.getSeriesTimestamp());
+
+ // make sure the series size matches...
+ assertEquals(aggrDetailsS1.getSeriesSize(), aggrDetailsS2.getSeriesSize());
+ assertEquals(aggrDetailsS2.getSeriesSize(), aggrDetailsAggrM1.getSeriesSize());
+ assertEquals(aggrDetailsAggrM1.getSeriesSize(), aggrDetailsAggrM2.getSeriesSize());
+
+ // make sure the message num's don't match, and that they're 1 or 2...
+ assertNotSame(aggrDetailsAggrM1.getMessageNumber(), aggrDetailsAggrM2.getMessageNumber());
+ assertTrue(aggrDetailsAggrM1.getMessageNumber() == 1 || aggrDetailsAggrM1.getMessageNumber() == 2);
+ assertTrue(aggrDetailsAggrM2.getMessageNumber() == 1 || aggrDetailsAggrM2.getMessageNumber() == 2);
+
+ // make sure the splitId matches...
+ assertEquals("splitter1-action", aggrDetailsS1.getSplitId());
+ assertEquals(aggrDetailsS1.getSplitId(), aggrDetailsS2.getSplitId());
+ assertEquals(aggrDetailsS2.getSplitId(), aggrDetailsAggrM1.getSplitId());
+ assertEquals(aggrDetailsAggrM1.getSplitId(), aggrDetailsAggrM2.getSplitId());
+ }
+}
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/JBESB_1204_1331_UnitTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,237 @@
+/*
+ Milyn - Copyright (C) 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License (version 2.1) as published by the Free Software
+ Foundation.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+
+ See the GNU Lesser General Public License for more details:
+ http://www.gnu.org/licenses/lgpl.txt
+*/
+package org.jboss.soa.esb.actions.aggregation;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.*;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.testutils.ESBConfigUtil;
+import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for JBESB-1204 and JBESB-1331.
+ * <p/>
+ * Make sure the message aggregation info flows in the following scenario...
+ * <pre>
+ *
+ * |------------------ service1 ----------------|
+ * | |
+ * splitter1 --| |-- aggregator2 --
+ * | |--service2--| |
+ * |-- splitter2 --| |--aggregator1--|
+ * |--service3--|
+ *
+ * </pre>
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class Nested_Splits_UnitTest extends TestCase {
+
+ private TestCourier splitter2Courier; // no need for a courier for splitter1
+ private TestCourier service1Courier;
+ private TestCourier service2Courier;
+ private TestCourier service3Courier;
+ private TestCourier aggregator1Courier;
+ private TestCourier aggregator2Courier;
+
+ private StaticRouter splitter1;
+ private StaticRouter splitter2;
+ private StaticRouter service1;
+ private StaticRouter service2;
+ private StaticRouter service3;
+ private Aggregator aggregator1;
+ private Aggregator aggregator2;
+ private Aggregator aggregator3;
+
+ protected void setUp() throws Exception {
+ MockCourierFactory.install();
+ MockRegistry.install();
+
+ splitter2Courier = new TestCourier();
+ service1Courier = new TestCourier();
+ service2Courier = new TestCourier();
+ service3Courier = new TestCourier();
+ aggregator1Courier = new TestCourier();
+ aggregator2Courier = new TestCourier();
+
+ MockRegistry.register("test", "splitter2", splitter2Courier);
+ MockRegistry.register("test", "service1", service1Courier);
+ MockRegistry.register("test", "service2", service2Courier);
+ MockRegistry.register("test", "service3", service3Courier);
+ MockRegistry.register("test", "aggregator1", aggregator1Courier);
+ MockRegistry.register("test", "aggregator2", aggregator2Courier);
+
+ initaliseServices();
+ }
+
+ private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException {
+ ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-02.xml"));
+ ConfigTree splitter1Config = esbConfig.getActionConfig("null-listener", "splitter1-action");
+ ConfigTree splitter2Config = esbConfig.getActionConfig("null-listener", "splitter2-action");
+ ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
+ ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
+ ConfigTree service3Config = esbConfig.getActionConfig("null-listener", "service3-config");
+ ConfigTree aggregator1Config = esbConfig.getActionConfig("null-listener", "aggregator1-config");
+ ConfigTree aggregator2Config = esbConfig.getActionConfig("null-listener", "aggregator2-config");
+ ConfigTree aggregator3Config = esbConfig.getActionConfig("null-listener", "aggregator3-config");
+
+ splitter1 = new StaticRouter(splitter1Config);
+ splitter1.initialise();
+ splitter2 = new StaticRouter(splitter2Config);
+ splitter2.initialise();
+ service1 = new StaticRouter(service1Config);
+ service1.initialise();
+ service2 = new StaticRouter(service2Config);
+ service2.initialise();
+ service3 = new StaticRouter(service3Config);
+ service3.initialise();
+ aggregator1 = new Aggregator(aggregator1Config);
+ aggregator1.initialise();
+ aggregator2 = new Aggregator(aggregator2Config);
+ aggregator2.initialise();
+ aggregator3 = new Aggregator(aggregator3Config);
+ aggregator3.initialise();
+ }
+
+ protected void tearDown() throws Exception {
+ splitter1.destroy();
+ service1.destroy();
+ service2.destroy();
+ aggregator1.destroy();
+ MockRegistry.uninstall();
+ MockCourierFactory.uninstall();
+ }
+
+ /**
+ * |------------------ service1 ----------------|
+ * | |
+ * splitter1 --| |-- aggregator2 --
+ * | |--service2--| |
+ * |-- splitter2 --| |--aggregator1--|
+ * |--service3--|
+ */
+ public void test_nested_split() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, IOException, SAXException, ParserConfigurationException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+
+ // Manually deliver the message to the splitter1 service...
+ splitter1.process(messageIn);
+ AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0);
+ assertNotNull(service1Message);
+
+ // Manually deliver the message in service1Courier into service1...
+ service1.process(service1Courier.messages.get(0));
+
+ // Manually deliver the message in splitter2Courier into splitter2...
+ splitter2.process(splitter2Courier.messages.get(0));
+ AggregationDetails service2Message = Aggregator.getAggregatorDetails(service2Courier.messages.get(0), 0);
+ assertNotNull(service2Message);
+
+ // Manually deliver the message in aggregator2Courier into aggregator2 (this is the message from service1)...
+ aggregator2.process(aggregator2Courier.messages.get(0));
+
+ // Manually deliver the message in service2Courier into service2...
+ service2.process(service2Courier.messages.get(0));
+
+ // Manually deliver the message in service3Courier into service3...
+ service3.process(service3Courier.messages.get(0));
+
+ // aggregator1Courier should have 2 messages in it... one from service2
+ // and one from service3...
+ assertEquals(2, aggregator1Courier.messages.size());
+
+ // Should be 2 aggrTags before aggregation...
+ List<String> aggrTags = Aggregator.getAggregatorTags(aggregator1Courier.messages.get(1));
+ assertEquals(2, aggrTags.size());
+
+ // Manually deliver the 2 messages to aggregator1...
+ aggregator1.process(aggregator1Courier.messages.get(0));
+ Message aggregator1Message = aggregator1.process(aggregator1Courier.messages.get(1));
+ assertNoAggregationTags(aggregator1Courier.messages.get(0));
+ assertNoAggregationTags(aggregator1Courier.messages.get(1));
+ aggrTags = Aggregator.getAggregatorTags(aggregator1Message);
+ assertEquals(1, aggrTags.size());
+ AggregationDetails aggregator1MessageAggrDetails = Aggregator.getAggregatorDetails(aggregator1Message, 0);
+ assertEquals(service1Message.getSeriesUuid(), aggregator1MessageAggrDetails.getSeriesUuid());
+
+ // Manually deliver the aggregator1Message to aggregator2...
+ Message aggregator2Message = aggregator2.process(aggregator1Message);
+ assertNoAggregationTags(aggregator1Message);
+ assertEquals(2, aggregator2Message.getAttachment().getUnnamedCount());
+ Message message1 = Util.deserialize((Serializable) aggregator2Message.getAttachment().itemAt(0));
+ assertNoAggregationTags(message1);
+ Message message2 = Util.deserialize((Serializable) aggregator2Message.getAttachment().itemAt(1));
+ assertNoAggregationTags(message2);
+ }
+
+ public void test_splitIds() throws ActionProcessingException {
+ // Should be allowed to pump messages with diff splitIds into aggregator2
+ // because there's no splitId set on it...
+ test_splitIds(aggregator2, "splitId-1", "splitId-2");
+
+ // Should get an error if I try the same with aggregator 3, coz it has
+ // a "splitId" set on it...
+ try {
+ test_splitIds(aggregator3, "splitId-x", "splitId-y");
+ fail("Expected ActionProcessingException");
+ } catch (ActionProcessingException e) {
+ assertEquals("Invalid aggregation config on aggregator 'aggregator3-config' . This aggregator is configured to only aggregate message with an aggregation 'spliId' of 'splitId1'. The splitId on the received message is 'splitId-x'. A nested aggregation point may be missing, or may have been bypassed.", e.getMessage());
+ }
+
+ // The following should work because the splitIds match the split ID on
+ // aggregator3's config (see action-configs-02.xml).
+ test_splitIds(aggregator3, "splitId1", "splitId1");
+
+ }
+
+ private void test_splitIds(Aggregator aggregator, String splitId1, String splitId2) throws ActionProcessingException {
+ Message messageIn = MessageFactory.getInstance().getMessage();
+ AggregationDetails aggrDetails;
+ List<String> aggrTags = new ArrayList<String>();
+
+ aggrDetails = new AggregationDetails("xx", 1, 2, 123123);
+ aggrDetails.setSplitId(splitId1);
+ aggrTags.add(aggrDetails.toString());
+ Aggregator.setAggregatorTags(messageIn, aggrTags);
+ aggregator.process(messageIn);
+
+ aggrDetails = new AggregationDetails("cc", 1, 2, 123123);
+ aggrDetails.setSplitId(splitId1);
+ aggrTags.add(aggrDetails.toString());
+ Aggregator.setAggregatorTags(messageIn, aggrTags);
+ aggregator.process(messageIn);
+
+ }
+
+ private void assertNoAggregationTags(Message aggregator1Message) {
+ List<String> aggrTags;
+ aggrTags = Aggregator.getAggregatorTags(aggregator1Message);
+ assertNull(aggrTags);
+ }
+}
\ No newline at end of file
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/Nested_Splits_UnitTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregation;
+
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.internal.soa.esb.message.format.xml.MessageImpl;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.tests.XMLMessageUnitTest;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+*/
+class TestCourier extends MockCourier {
+
+ public List<Message> messages = new ArrayList<Message>();
+
+ public TestCourier() {
+ super(true);
+ }
+
+ public boolean deliver(Message message) throws CourierException, MalformedEPRException {
+ try {
+ String xmlRepresentation = XMLMessageUnitTest.msgToXML((MessageImpl)message);
+ messages.add(XMLMessageUnitTest.msgFromXML(xmlRepresentation));
+ return true;
+ } catch(Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ return false;
+ }
+}
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/TestCourier.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,43 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
+
+ <providers>
+ <bus-provider name="null"><bus busid="null"/></bus-provider>
+ </providers>
+
+ <services>
+ <service category="MessageRouting" name="SplitterService" description="Sends messages to N destinations">
+
+ <listeners>
+ <listener name="null-listener" busidref="null" />
+ </listeners>
+
+ <actions>
+
+ <action name="splitter1-action" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="service1"/>
+ <route-to service-category="test" service-name="service2"/>
+ </property>
+ </action>
+
+ <action name="service1-config" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="aggregator"/>
+ </property>
+ </action>
+
+ <action name="service2-config" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="aggregator"/>
+ </property>
+ </action>
+
+ <action name="aggregator-config" class="org.jboss.soa.esb.actions.Aggregator">
+ <property name="timeoutInMillies" value="2000"/>
+ </action>
+
+ </actions>
+ </service>
+ </services>
+</jbossesb>
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-01.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Added: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml (rev 0)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml 2007-11-21 16:52:43 UTC (rev 16733)
@@ -0,0 +1,66 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
+
+ <providers>
+ <bus-provider name="null"><bus busid="null"/></bus-provider>
+ </providers>
+
+ <services>
+ <service category="MessageRouting" name="SplitterService" description="Sends messages to N destinations">
+
+ <listeners>
+ <listener name="null-listener" busidref="null" />
+ </listeners>
+
+ <actions>
+
+ <action name="splitter1-action" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="service1"/>
+ <route-to service-category="test" service-name="splitter2"/>
+ </property>
+ </action>
+
+ <action name="splitter2-action" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="service2"/>
+ <route-to service-category="test" service-name="service3"/>
+ </property>
+ </action>
+
+ <action name="service1-config" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="aggregator2"/>
+ </property>
+ </action>
+
+ <action name="service2-config" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="aggregator1"/>
+ </property>
+ </action>
+
+ <action name="service3-config" class="org.jboss.soa.esb.actions.StaticRouter">
+ <property name="destinations">
+ <route-to service-category="test" service-name="aggregator1"/>
+ </property>
+ </action>
+
+ <action name="aggregator1-config" class="org.jboss.soa.esb.actions.Aggregator">
+ <property name="timeoutInMillies" value="200000"/>
+ </action>
+
+ <action name="aggregator2-config" class="org.jboss.soa.esb.actions.Aggregator">
+ <property name="timeoutInMillies" value="200000"/>
+ </action>
+
+ <!-- Used for splitId checking tests -->
+ <action name="aggregator3-config" class="org.jboss.soa.esb.actions.Aggregator">
+ <property name="timeoutInMillies" value="200000"/>
+ <property name="splitId" value="splitId1"/>
+ </action>
+
+ </actions>
+ </service>
+ </services>
+</jbossesb>
Property changes on: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/actions/aggregation/action-configs-02.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
Modified: labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java 2007-11-21 16:06:10 UTC (rev 16732)
+++ labs/jbossesb/branches/JBESB_4_2_1_GA_CP/product/rosetta/tests/src/org/jboss/soa/esb/message/tests/XMLMessageUnitTest.java 2007-11-21 16:52:43 UTC (rev 16733)
@@ -22,30 +22,19 @@
package org.jboss.soa.esb.message.tests;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.net.URL;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
import junit.framework.TestCase;
-
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.message.format.xml.MessageImpl;
import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.PortReference;
import org.jboss.soa.esb.addressing.eprs.*;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.body.content.BytesBody;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.message.format.MessageType;
-import org.w3c.dom.Document;
-import org.xml.sax.InputSource;
+import org.jboss.soa.esb.util.Util;
-import com.sun.org.apache.xml.internal.serialize.OutputFormat;
-import com.sun.org.apache.xml.internal.serialize.XMLSerializer;
+import java.net.URL;
/**
* Unit tests for the Class class.
@@ -550,42 +539,12 @@
public static String msgToXML(final MessageImpl msg)
throws Exception
{
- final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance() ;
-
- factory.setNamespaceAware(true);
-
- final DocumentBuilder builder = factory.newDocumentBuilder() ;
- Document doc = builder.newDocument() ;
-
- doc = msg.toXML(doc) ;
-
- final StringWriter sWriter = new StringWriter() ;
- final OutputFormat format = new OutputFormat() ;
- format.setIndenting(true) ;
-
- final XMLSerializer xmlS = new XMLSerializer(sWriter, format) ;
-
- xmlS.asDOMSerializer() ;
- xmlS.serialize(doc) ;
-
- return sWriter.toString() ;
+ return (String) Util.serialize(msg);
}
public static MessageImpl msgFromXML(final String xmlRepresentation)
throws Exception
{
- final StringReader stringReader = new StringReader(xmlRepresentation) ;
- final InputSource inputSource = new InputSource(stringReader) ;
-
- final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance() ;
-
- factory.setNamespaceAware(true);
-
- final DocumentBuilder builder = factory.newDocumentBuilder() ;
- final Document doc = builder.parse(inputSource) ;
-
- final MessageImpl message = new MessageImpl() ;
- message.fromXML(doc) ;
- return message ;
+ return (MessageImpl) Util.deserialize(xmlRepresentation);
}
}
More information about the jboss-svn-commits
mailing list