[jboss-svn-commits] JBL Code SVN: r10940 - in labs/jbossesb/trunk: product/core/services/src/org/jboss/soa/esb/services/routing and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Apr 12 15:58:40 EDT 2007


Author: kurt.stam at jboss.com
Date: 2007-04-12 15:58:40 -0400 (Thu, 12 Apr 2007)
New Revision: 10940

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
   labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml
   labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
Log:
Allways adding tags, plus allowing for nested split/aggregation.

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java	2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java	2007-04-12 19:58:40 UTC (rev 10940)
@@ -131,8 +131,26 @@
      */
 	public Message process(Message message) throws ActionProcessingException
 	{
-		String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+        boolean aggregatorTagIsFound=true;
+        int tagCounter=0;
+        while (aggregatorTagIsFound) {
+            if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
+                aggregatorTagIsFound=false;
+                tagCounter--;
+            } else {
+                tagCounter++;
+            }
+        }
+		String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
         if (aggregatorTag!=null) {
+            
+            //Removing the tags and setting them as "the one in current use"
+            message.getProperties().remove(MessageRouter.AGGEGRATOR_TAG + tagCounter);
+            message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTag);
+            Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
+            message.getProperties().remove(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
+            message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP, splitterTimeStamp);
+            
             String[] tag = aggregatorTag.split(":");
             String uuId = tag[0];
             String messageNumber = tag[1];
@@ -201,9 +219,26 @@
     {
         //Create an aggregated message
         Message aggregatedMessage = MessageFactory.getInstance().getMessage();
-        for (Message oneMessage : messageMap.values()) {
+        boolean isFirstTime=true;
+        for (Message message : messageMap.values()) {
+            //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
+            if (isFirstTime) {
+                boolean aggregatorTagIsFound=true;
+                int tagCounter=0;
+                while (aggregatorTagIsFound) {
+                    if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
+                        aggregatorTagIsFound=false;
+                    } else {
+                        String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
+                        Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
+                        message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter, aggregatorTag);
+                        message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter, splitterTimeStamp);
+                        tagCounter++;
+                    }
+                }
+            }
             //Add the individual messages as attachments
-                aggregatedMessage.getAttachment().addItem(oneMessage);
+            aggregatedMessage.getAttachment().addItem(message);
         }
         _aggregatedMessageMap.remove(uuId);
         //TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java	2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java	2007-04-12 19:58:40 UTC (rev 10940)
@@ -27,6 +27,9 @@
  */
 package org.jboss.soa.esb.actions;
 
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,6 +43,10 @@
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.persistence.MessageStore;
+import org.jboss.soa.esb.services.persistence.MessageStoreException;
+import org.jboss.soa.esb.services.persistence.MessageStoreFactory;
+import org.jboss.soa.esb.services.persistence.MessageStoreType;
 import org.jboss.soa.esb.services.registry.Registry;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.services.registry.RegistryFactory;
@@ -51,10 +58,6 @@
 {
 	public static final String ROUTE_TO_TAG = "route-to";
 	private Logger log = Logger.getLogger(this.getClass());
-    
-	private ContentBasedRouter()
-	{
-	}
 
 	public ContentBasedRouter(ConfigTree config) throws ConfigurationException, RegistryException, MessageRouterException
 	{
@@ -62,14 +65,19 @@
 		checkMyParms();
 		_registry = RegistryFactory.getRegistry();
         _cbr = ContentBasedRouterFactory.getRouter();
-	} // ________________________________
-
+	}
+	/** Router the message to one or more destinations, using the ContentBasedRouter to figure out
+     *  to which destinations it is going to be routed too.
+     *  
+     * @param message
+     * @return
+     * @throws MalformedEPRException
+     * @throws RegistryException
+     * @throws CourierException
+     * @throws MessageRouterException
+	 */
 	public Message process(Message message) throws MalformedEPRException, RegistryException, CourierException, MessageRouterException
 	{
-		//Call call = message.getHeader().getCall();
-		//if (null == call || null == call.getMessageID())
-		//	throw new IllegalArgumentException("Null message ID");
-
         List<String> destinations = _cbr.route(_ruleSet, _ruleLanguage, _ruleReload, message);
         Collection<String[]> outgoingDestinations = new ArrayList<String[]>();
 		for (String destination : destinations) {
@@ -78,40 +86,35 @@
             }
 		}
         if (outgoingDestinations.size()>0) {
-            MessageRouter.deliverMessages(outgoingDestinations, message, false); 
+            MessageRouter.deliverMessages(outgoingDestinations, message); 
         } else if (destinations.size() > 0) {
             log.error("The rule destination(s) " + destinations 
                     + " are  not in found in the destination names in the configuration "
-                    + _destinations.keySet() + ". Please fix your configuration."); 
+                    + _destinations.keySet() + ". Please fix your configuration.");
+            //TODO send it to a Dead Letter Service rather then hard coding it to go to the message store.
+            MessageStore store = MessageStoreFactory.getInstance().getMessageStore(MessageStoreType.DEFAULT_TYPE);
+            assertEquals((store != null), true);
+            try {
+                URI uid = store.addMessage(message);
+                store.setUndelivered(uid);
+            } catch (MessageStoreException mse) {
+                log.error("Could not store undeliverable message.", mse);
+            }
         }
-        
 		return message;
-	} // ________________________________
+	}
     
+    /** 
+     * @deprecated no longer needed, leaving this in here for backwards compatibility,
+     * use the default "process".
+     * */
     public Message split(Message message) throws MalformedEPRException, RegistryException, CourierException, MessageRouterException
     {
-        //Call call = message.getHeader().getCall();
-        //if (null == call || null == call.getMessageID())
-        //  throw new IllegalArgumentException("Null message ID");
+        log.warn("Depricated, please use the default 'process' method in your configuration");
+        return process(message);
+    }
+    
 
-        List<String> destinations = _cbr.route(_ruleSet, _ruleLanguage, _ruleReload, message);
-        Collection<String[]> outgoingDestinations = new ArrayList<String[]>();
-        for (String destination : destinations) {
-            if (_destinations.containsKey(destination)) {
-                outgoingDestinations.add(_destinations.get(destination));
-            }
-        }
-        if (outgoingDestinations.size()>0) {
-            MessageRouter.deliverMessages(outgoingDestinations, message, true); 
-        } else if (destinations.size() > 0) {
-            log.error("The rule destination(s) " + destinations 
-                    + " are  not in found in the destination names in the configuration "
-                    + _destinations.keySet() + ". Please fix your configuration."); 
-        }
-        
-        return message;
-    } // ________________________________
-
 	protected void checkMyParms() throws ConfigurationException
 	{
         if (_config.getAttribute(ListenerTagNames.RULE_SET_TAG)==null) {
@@ -152,21 +155,21 @@
 			}
         }
         
-	} // ________________________________
+	}
 
 	protected ConfigTree _config;
 
 	protected Map<String, String[]> _destinations;
     
-    private String _ruleSet;
+    protected String _ruleSet;
     
-    private String _ruleLanguage;
+    protected String _ruleLanguage;
     
-    private boolean _ruleReload;
+    protected boolean _ruleReload;
 
 	protected Registry _registry;
     
-    private org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter _cbr;
+    protected org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter _cbr;
 
 	protected static Logger _logger = Logger.getLogger(ContentBasedRouter.class);
 

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java	2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java	2007-04-12 19:58:40 UTC (rev 10940)
@@ -32,8 +32,6 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
@@ -44,70 +42,60 @@
 public class StaticRouter extends AbstractActionPipelineProcessor
 {
 	public static final String ROUTE_TO_TAG = "route-to";
+    private Logger log = Logger.getLogger(this.getClass());
 
-	private StaticRouter()
-	{
-	}
+	private StaticRouter(){}
 
 	public StaticRouter(ConfigTree config) throws ConfigurationException, RegistryException
 	{
 		_config = config;
-	} // ________________________________
-
+	}
+	/**
+     * Routes the message to one or more destonations.
+	 */
 	public Message process(Message message) throws ActionProcessingException
 	{
-		try
-		{
-			MessageRouter.deliverMessages(_destinations, message, false);
+		try {
+			MessageRouter.deliverMessages(_destinations, message);
 			return message;
-		}
-		catch (MessageRouterException ex)
-		{
+		} catch (MessageRouterException ex) {
 			throw new ActionProcessingException(ex);
-                }
-	} // ________________________________
-        public void initialise() throws ActionLifecycleException
-        {
-		_destinations = new ArrayList<String[]>();
-		ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
-		if (null == destList || destList.length < 1)
-		{
-			_logger.warn("Missing or empty destination list - This action class won't have any effect");
-			return;
-		}
-		for (ConfigTree curr : destList)
-			try
-			{
-				String category = curr.getAttribute(
-						ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
-				String name = curr.getRequiredAttribute(
-						ListenerTagNames.SERVICE_NAME_TAG);
-				_destinations.add(new String[]
-				{ category, name});
-			}
-			catch (Exception e)
-			{
-				throw new ActionLifecycleException(
-						"Problems with destination list", e);
-			}
-	} // ________________________________
-    
-    public Message split(Message message) throws MalformedEPRException, RegistryException, CourierException
+        }
+	}
+    /** 
+     * @deprecated no longer needed, leaving this in here for backwards compatibility,
+     * use the default "process".
+     * */
+    public Message split(Message message) throws ActionProcessingException 
     {
-        try
+        log.warn("Depricated, please use the default 'process' method in your configuration");
+        return process(message);
+    }
+    /**
+     * Initialization by reading the configuration.
+     */
+    public void initialise() throws ActionLifecycleException
+    {
+        _destinations = new ArrayList<String[]>();
+        ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
+        if (null == destList || destList.length < 1)
         {
-        //  Call call = message.getHeader().getCall();
-        //  if (null == call || null == call.getMessageID())
-        //      throw new IllegalArgumentException("Null message ID");
-    
-            MessageRouter.deliverMessages(_destinations, message, true);
-            return message;
+        	_logger.warn("Missing or empty destination list - This action class won't have any effect");
+        	return;
         }
-        catch (MessageRouterException ex)
-        {
-            throw new MalformedEPRException(ex);
+        for (ConfigTree curr : destList) {
+        	try {
+        		String category = curr.getAttribute(
+        				ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
+        		String name = curr.getRequiredAttribute(
+        				ListenerTagNames.SERVICE_NAME_TAG);
+        		_destinations.add(new String[]{ category, name});
+        	} catch (Exception e) {
+        		throw new ActionLifecycleException(
+        				"Problems with destination list", e);
+        	}
         }
-    } // ________________________________
+    }
 
 	protected ConfigTree _config;
 

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-04-12 19:58:40 UTC (rev 10940)
@@ -76,20 +76,32 @@
      *            aggregation purposes.
 	 */
 	public synchronized static void deliverMessages(
-			Collection<String[]> destinations, Message message, boolean isSplitter) throws MessageRouterException
+			Collection<String[]> destinations, Message message) throws MessageRouterException
 	{
         String uuId = UUID.randomUUID().toString();
         int counter=0;
+        boolean aggregatorTagIsFound=true;
+        int tagCounter=0;
+        while (aggregatorTagIsFound) {
+            if (message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter)==null){
+                aggregatorTagIsFound=false;
+                if (tagCounter>0) tagCounter--;
+            } else {
+                tagCounter++;
+            }
+        }
 		for (Iterator<String[]> i = destinations.iterator(); i.hasNext();)
 		{
-            if (isSplitter) {
-                message.getProperties().setProperty(AGGEGRATOR_TAG, uuId + ":" + ++counter + ":" + destinations.size());
-                message.getProperties().setProperty(SPLITTER_TIME_STAMP, new java.util.Date().getTime());
+            //Only put tags on when routing to more then 1 destination
+            if (destinations.size()>1) {
+                message.getProperties().setProperty(AGGEGRATOR_TAG + tagCounter, uuId + ":" + ++counter + ":" + destinations.size());
+                message.getProperties().setProperty(SPLITTER_TIME_STAMP + tagCounter, new java.util.Date().getTime());
                 if (logger.isDebugEnabled()) {
-                    String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG);
-                    logger.debug(AGGEGRATOR_TAG + "=" + tag);
+                    String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter);
+                    logger.debug(AGGEGRATOR_TAG+ tagCounter + "=" + tag);
                 }
             }
+         
             String[] destination = (String[]) i.next();
 			String category = destination[0];
 			String serviceName = destination[1];

Modified: labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml	2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/product/samples/quickstarts/aggregator/inbound_splitter/jbossesb.xml	2007-04-12 19:58:40 UTC (rev 10940)
@@ -34,7 +34,7 @@
              <action name="print-before" class="org.jboss.soa.esb.actions.SystemPrintln">
                  <property name="message" value="Hello Splitting Router" />
              </action>
-             <action process="split" class="org.jboss.soa.esb.actions.StaticRouter" name="StaticRouter">
+             <action class="org.jboss.soa.esb.actions.StaticRouter" name="StaticRouter">
                  <property name="destinations"> <!-- process="split" is the trick -->        		      
         	        <route-to destination-name="red"   service-category="RedTeam"   service-name="GoRed" /> 
         	        <route-to destination-name="blue"   service-category="BlueTeam"   service-name="GoBlue" /> 

Modified: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java	2007-04-12 19:04:33 UTC (rev 10939)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java	2007-04-12 19:58:40 UTC (rev 10940)
@@ -28,8 +28,6 @@
 import java.io.InputStream;
 import java.sql.DriverManager;
 import java.sql.Statement;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.Properties;
 
 import junit.framework.JUnit4TestAdapter;
@@ -126,16 +124,10 @@
 		msg.getBody().setContents(body.getBytes());
 		
 		Registry registry = RegistryFactory.getRegistry();
-		Collection<EPR> eprs = registry.findEPRs(SERVICE_CATEGORY_NAME,
+		EPR epr = registry.findEPR(SERVICE_CATEGORY_NAME,
 				SERVICE_NAME);
-		for (Iterator<EPR> eprIterator = eprs.iterator(); eprIterator.hasNext();)
-		{
-			// Just use the first EPR in the list.
-			EPR epr = eprIterator.next();
-			Courier courier = CourierFactory.getCourier(epr);
-			courier.deliver(msg);
-			break;
-		}
+		Courier courier = CourierFactory.getCourier(epr);
+		courier.deliver(msg);
 	}
 
 	public static junit.framework.Test suite()
@@ -257,7 +249,7 @@
 	@AfterClass
 	public static void runAfterAllTests() throws Exception
 	{
-        Thread.sleep(1000);
+        Thread.sleep(2000);
 		// Increase Sleep for debugging
 		_boot.requestEnd();
 		// Give the esb time to finish




More information about the jboss-svn-commits mailing list