[jboss-svn-commits] JBL Code SVN: r10940 - in labs/jbossesb/trunk: product/core/services/src/org/jboss/soa/esb/services/routing and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Apr 12 15:58:40 EDT 2007
Author: kurt.stam at jboss.com
Date: 2007-04-12 15:58:40 -0400 (Thu, 12 Apr 2007)
New Revision: 10940
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml
labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
Log:
Allways adding tags, plus allowing for nested split/aggregation.
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java 2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java 2007-04-12 19:58:40 UTC (rev 10940)
@@ -131,8 +131,26 @@
*/
public Message process(Message message) throws ActionProcessingException
{
- String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+ boolean aggregatorTagIsFound=true;
+ int tagCounter=0;
+ while (aggregatorTagIsFound) {
+ if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
+ aggregatorTagIsFound=false;
+ tagCounter--;
+ } else {
+ tagCounter++;
+ }
+ }
+ String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
if (aggregatorTag!=null) {
+
+ //Removing the tags and setting them as "the one in current use"
+ message.getProperties().remove(MessageRouter.AGGEGRATOR_TAG + tagCounter);
+ message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTag);
+ Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
+ message.getProperties().remove(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
+ message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP, splitterTimeStamp);
+
String[] tag = aggregatorTag.split(":");
String uuId = tag[0];
String messageNumber = tag[1];
@@ -201,9 +219,26 @@
{
//Create an aggregated message
Message aggregatedMessage = MessageFactory.getInstance().getMessage();
- for (Message oneMessage : messageMap.values()) {
+ boolean isFirstTime=true;
+ for (Message message : messageMap.values()) {
+ //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
+ if (isFirstTime) {
+ boolean aggregatorTagIsFound=true;
+ int tagCounter=0;
+ while (aggregatorTagIsFound) {
+ if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
+ aggregatorTagIsFound=false;
+ } else {
+ String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
+ Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
+ message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter, aggregatorTag);
+ message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter, splitterTimeStamp);
+ tagCounter++;
+ }
+ }
+ }
//Add the individual messages as attachments
- aggregatedMessage.getAttachment().addItem(oneMessage);
+ aggregatedMessage.getAttachment().addItem(message);
}
_aggregatedMessageMap.remove(uuId);
//TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java 2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java 2007-04-12 19:58:40 UTC (rev 10940)
@@ -27,6 +27,9 @@
*/
package org.jboss.soa.esb.actions;
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -40,6 +43,10 @@
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.persistence.MessageStore;
+import org.jboss.soa.esb.services.persistence.MessageStoreException;
+import org.jboss.soa.esb.services.persistence.MessageStoreFactory;
+import org.jboss.soa.esb.services.persistence.MessageStoreType;
import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.services.registry.RegistryFactory;
@@ -51,10 +58,6 @@
{
public static final String ROUTE_TO_TAG = "route-to";
private Logger log = Logger.getLogger(this.getClass());
-
- private ContentBasedRouter()
- {
- }
public ContentBasedRouter(ConfigTree config) throws ConfigurationException, RegistryException, MessageRouterException
{
@@ -62,14 +65,19 @@
checkMyParms();
_registry = RegistryFactory.getRegistry();
_cbr = ContentBasedRouterFactory.getRouter();
- } // ________________________________
-
+ }
+ /** Router the message to one or more destinations, using the ContentBasedRouter to figure out
+ * to which destinations it is going to be routed too.
+ *
+ * @param message
+ * @return
+ * @throws MalformedEPRException
+ * @throws RegistryException
+ * @throws CourierException
+ * @throws MessageRouterException
+ */
public Message process(Message message) throws MalformedEPRException, RegistryException, CourierException, MessageRouterException
{
- //Call call = message.getHeader().getCall();
- //if (null == call || null == call.getMessageID())
- // throw new IllegalArgumentException("Null message ID");
-
List<String> destinations = _cbr.route(_ruleSet, _ruleLanguage, _ruleReload, message);
Collection<String[]> outgoingDestinations = new ArrayList<String[]>();
for (String destination : destinations) {
@@ -78,40 +86,35 @@
}
}
if (outgoingDestinations.size()>0) {
- MessageRouter.deliverMessages(outgoingDestinations, message, false);
+ MessageRouter.deliverMessages(outgoingDestinations, message);
} else if (destinations.size() > 0) {
log.error("The rule destination(s) " + destinations
+ " are not in found in the destination names in the configuration "
- + _destinations.keySet() + ". Please fix your configuration.");
+ + _destinations.keySet() + ". Please fix your configuration.");
+ //TODO send it to a Dead Letter Service rather then hard coding it to go to the message store.
+ MessageStore store = MessageStoreFactory.getInstance().getMessageStore(MessageStoreType.DEFAULT_TYPE);
+ assertEquals((store != null), true);
+ try {
+ URI uid = store.addMessage(message);
+ store.setUndelivered(uid);
+ } catch (MessageStoreException mse) {
+ log.error("Could not store undeliverable message.", mse);
+ }
}
-
return message;
- } // ________________________________
+ }
+ /**
+ * @deprecated no longer needed, leaving this in here for backwards compatibility,
+ * use the default "process".
+ * */
public Message split(Message message) throws MalformedEPRException, RegistryException, CourierException, MessageRouterException
{
- //Call call = message.getHeader().getCall();
- //if (null == call || null == call.getMessageID())
- // throw new IllegalArgumentException("Null message ID");
+ log.warn("Depricated, please use the default 'process' method in your configuration");
+ return process(message);
+ }
+
- List<String> destinations = _cbr.route(_ruleSet, _ruleLanguage, _ruleReload, message);
- Collection<String[]> outgoingDestinations = new ArrayList<String[]>();
- for (String destination : destinations) {
- if (_destinations.containsKey(destination)) {
- outgoingDestinations.add(_destinations.get(destination));
- }
- }
- if (outgoingDestinations.size()>0) {
- MessageRouter.deliverMessages(outgoingDestinations, message, true);
- } else if (destinations.size() > 0) {
- log.error("The rule destination(s) " + destinations
- + " are not in found in the destination names in the configuration "
- + _destinations.keySet() + ". Please fix your configuration.");
- }
-
- return message;
- } // ________________________________
-
protected void checkMyParms() throws ConfigurationException
{
if (_config.getAttribute(ListenerTagNames.RULE_SET_TAG)==null) {
@@ -152,21 +155,21 @@
}
}
- } // ________________________________
+ }
protected ConfigTree _config;
protected Map<String, String[]> _destinations;
- private String _ruleSet;
+ protected String _ruleSet;
- private String _ruleLanguage;
+ protected String _ruleLanguage;
- private boolean _ruleReload;
+ protected boolean _ruleReload;
protected Registry _registry;
- private org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter _cbr;
+ protected org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter _cbr;
protected static Logger _logger = Logger.getLogger(ContentBasedRouter.class);
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java 2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java 2007-04-12 19:58:40 UTC (rev 10940)
@@ -32,8 +32,6 @@
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
@@ -44,70 +42,60 @@
public class StaticRouter extends AbstractActionPipelineProcessor
{
public static final String ROUTE_TO_TAG = "route-to";
+ private Logger log = Logger.getLogger(this.getClass());
- private StaticRouter()
- {
- }
+ private StaticRouter(){}
public StaticRouter(ConfigTree config) throws ConfigurationException, RegistryException
{
_config = config;
- } // ________________________________
-
+ }
+ /**
+ * Routes the message to one or more destonations.
+ */
public Message process(Message message) throws ActionProcessingException
{
- try
- {
- MessageRouter.deliverMessages(_destinations, message, false);
+ try {
+ MessageRouter.deliverMessages(_destinations, message);
return message;
- }
- catch (MessageRouterException ex)
- {
+ } catch (MessageRouterException ex) {
throw new ActionProcessingException(ex);
- }
- } // ________________________________
- public void initialise() throws ActionLifecycleException
- {
- _destinations = new ArrayList<String[]>();
- ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
- if (null == destList || destList.length < 1)
- {
- _logger.warn("Missing or empty destination list - This action class won't have any effect");
- return;
- }
- for (ConfigTree curr : destList)
- try
- {
- String category = curr.getAttribute(
- ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
- String name = curr.getRequiredAttribute(
- ListenerTagNames.SERVICE_NAME_TAG);
- _destinations.add(new String[]
- { category, name});
- }
- catch (Exception e)
- {
- throw new ActionLifecycleException(
- "Problems with destination list", e);
- }
- } // ________________________________
-
- public Message split(Message message) throws MalformedEPRException, RegistryException, CourierException
+ }
+ }
+ /**
+ * @deprecated no longer needed, leaving this in here for backwards compatibility,
+ * use the default "process".
+ * */
+ public Message split(Message message) throws ActionProcessingException
{
- try
+ log.warn("Depricated, please use the default 'process' method in your configuration");
+ return process(message);
+ }
+ /**
+ * Initialization by reading the configuration.
+ */
+ public void initialise() throws ActionLifecycleException
+ {
+ _destinations = new ArrayList<String[]>();
+ ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
+ if (null == destList || destList.length < 1)
{
- // Call call = message.getHeader().getCall();
- // if (null == call || null == call.getMessageID())
- // throw new IllegalArgumentException("Null message ID");
-
- MessageRouter.deliverMessages(_destinations, message, true);
- return message;
+ _logger.warn("Missing or empty destination list - This action class won't have any effect");
+ return;
}
- catch (MessageRouterException ex)
- {
- throw new MalformedEPRException(ex);
+ for (ConfigTree curr : destList) {
+ try {
+ String category = curr.getAttribute(
+ ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
+ String name = curr.getRequiredAttribute(
+ ListenerTagNames.SERVICE_NAME_TAG);
+ _destinations.add(new String[]{ category, name});
+ } catch (Exception e) {
+ throw new ActionLifecycleException(
+ "Problems with destination list", e);
+ }
}
- } // ________________________________
+ }
protected ConfigTree _config;
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-04-12 19:58:40 UTC (rev 10940)
@@ -76,20 +76,32 @@
* aggregation purposes.
*/
public synchronized static void deliverMessages(
- Collection<String[]> destinations, Message message, boolean isSplitter) throws MessageRouterException
+ Collection<String[]> destinations, Message message) throws MessageRouterException
{
String uuId = UUID.randomUUID().toString();
int counter=0;
+ boolean aggregatorTagIsFound=true;
+ int tagCounter=0;
+ while (aggregatorTagIsFound) {
+ if (message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter)==null){
+ aggregatorTagIsFound=false;
+ if (tagCounter>0) tagCounter--;
+ } else {
+ tagCounter++;
+ }
+ }
for (Iterator<String[]> i = destinations.iterator(); i.hasNext();)
{
- if (isSplitter) {
- message.getProperties().setProperty(AGGEGRATOR_TAG, uuId + ":" + ++counter + ":" + destinations.size());
- message.getProperties().setProperty(SPLITTER_TIME_STAMP, new java.util.Date().getTime());
+ //Only put tags on when routing to more then 1 destination
+ if (destinations.size()>1) {
+ message.getProperties().setProperty(AGGEGRATOR_TAG + tagCounter, uuId + ":" + ++counter + ":" + destinations.size());
+ message.getProperties().setProperty(SPLITTER_TIME_STAMP + tagCounter, new java.util.Date().getTime());
if (logger.isDebugEnabled()) {
- String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG);
- logger.debug(AGGEGRATOR_TAG + "=" + tag);
+ String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter);
+ logger.debug(AGGEGRATOR_TAG+ tagCounter + "=" + tag);
}
}
+
String[] destination = (String[]) i.next();
String category = destination[0];
String serviceName = destination[1];
Modified: labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml 2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml 2007-04-12 19:58:40 UTC (rev 10940)
@@ -34,7 +34,7 @@
<action name="print-before" class="org.jboss.soa.esb.actions.SystemPrintln">
<property name="message" value="Hello Splitting Router" />
</action>
- <action process="split" class="org.jboss.soa.esb.actions.StaticRouter" name="StaticRouter">
+ <action class="org.jboss.soa.esb.actions.StaticRouter" name="StaticRouter">
<property name="destinations"> <!-- process="split" is the trick -->
<route-to destination-name="red" service-category="RedTeam" service-name="GoRed" />
<route-to destination-name="blue" service-category="BlueTeam" service-name="GoBlue" />
Modified: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java 2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java 2007-04-12 19:58:40 UTC (rev 10940)
@@ -28,8 +28,6 @@
import java.io.InputStream;
import java.sql.DriverManager;
import java.sql.Statement;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.Properties;
import junit.framework.JUnit4TestAdapter;
@@ -126,16 +124,10 @@
msg.getBody().setContents(body.getBytes());
Registry registry = RegistryFactory.getRegistry();
- Collection<EPR> eprs = registry.findEPRs(SERVICE_CATEGORY_NAME,
+ EPR epr = registry.findEPR(SERVICE_CATEGORY_NAME,
SERVICE_NAME);
- for (Iterator<EPR> eprIterator = eprs.iterator(); eprIterator.hasNext();)
- {
- // Just use the first EPR in the list.
- EPR epr = eprIterator.next();
- Courier courier = CourierFactory.getCourier(epr);
- courier.deliver(msg);
- break;
- }
+ Courier courier = CourierFactory.getCourier(epr);
+ courier.deliver(msg);
}
public static junit.framework.Test suite()
@@ -257,7 +249,7 @@
@AfterClass
public static void runAfterAllTests() throws Exception
{
- Thread.sleep(1000);
+ Thread.sleep(2000);
// Increase Sleep for debugging
_boot.requestEnd();
// Give the esb time to finish
More information about the jboss-svn-commits
mailing list