[jboss-svn-commits] JBL Code SVN: r11361 - in labs/jbossesb/trunk/product/core: services/src/org/jboss/soa/esb/services/routing and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Apr 26 11:58:19 EDT 2007


Author: kurt.stam at jboss.com
Date: 2007-04-26 11:58:18 -0400 (Thu, 26 Apr 2007)
New Revision: 11361

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
Log:
Simplifying tags with using array

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java	2007-04-26 15:43:52 UTC (rev 11360)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java	2007-04-26 15:58:18 UTC (rev 11361)
@@ -68,6 +68,7 @@
  */
 public class Aggregator extends AbstractActionPipelineProcessor
 {
+    public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
     private ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
         = new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
     private TimeoutChecker _timeoutChecker=null;
@@ -129,33 +130,27 @@
      * @return a aggregated message, or null if the aggregation has not been completed.
      * @throws ActionProcessingException
      */
-	public Message process(Message message) throws ActionProcessingException
+	@SuppressWarnings("unchecked")
+    public Message process(Message message) throws ActionProcessingException
 	{
-        boolean aggregatorTagIsFound=true;
-        int tagCounter=0;
-        while (aggregatorTagIsFound) {
-            if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
-                aggregatorTagIsFound=false;
-                tagCounter--;
+        ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+		
+        if (aggregatorTags!=null && aggregatorTags.size()>0) {
+            String aggregatorTag = (String) aggregatorTags.get(aggregatorTags.size()-1);
+            //Removing the last tags and setting them as "the one in current use"
+            if (aggregatorTags.size()>1) {
+                aggregatorTags.remove(aggregatorTags.size()-1);
+                message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTags);
             } else {
-                tagCounter++;
+                message.getProperties().remove(MessageRouter.AGGEGRATOR_TAG);
             }
-        }
-		String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
-        if (aggregatorTag!=null) {
-            
-            //Removing the tags and setting them as "the one in current use"
-            message.getProperties().remove(MessageRouter.AGGEGRATOR_TAG + tagCounter);
-            message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTag);
-            Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
-            message.getProperties().remove(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
-            message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP, splitterTimeStamp);
-            
             String[] tag = aggregatorTag.split(":");
             String uuId = tag[0];
             String messageNumber = tag[1];
             int totalNumberOfMessages = Integer.valueOf(tag[2]).intValue();
-             
+            Long splitterTimeStamp = Long.valueOf(tag[3]);
+            message.getProperties().setProperty(SPLITTER_TIME_STAMP, splitterTimeStamp);
+
             if (isTimedOut(message)) {
                 if (_aggregatedMessageMap.containsKey(uuId)) {
                     ConcurrentHashMap<String, Message> messageMap = _aggregatedMessageMap.get(uuId);
@@ -222,20 +217,13 @@
         boolean isFirstTime=true;
         for (Message message : messageMap.values()) {
             //Push additional AggregatorTags onto the new message, so we can aggregate in case of nested splits.
+            //Only need to get it from the first message, should be the same for the others.
             if (isFirstTime) {
-                boolean aggregatorTagIsFound=true;
-                int tagCounter=0;
-                while (aggregatorTagIsFound) {
-                    if (message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter)==null){
-                        aggregatorTagIsFound=false;
-                    } else {
-                        String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter);
-                        Long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter);
-                        message.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG + tagCounter, aggregatorTag);
-                        message.getProperties().setProperty(MessageRouter.SPLITTER_TIME_STAMP + tagCounter, splitterTimeStamp);
-                        tagCounter++;
-                    }
+                ArrayList aggregatorTags = (ArrayList) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+                if (aggregatorTags!=null && aggregatorTags.size()>0) {
+                    aggregatedMessage.getProperties().setProperty(MessageRouter.AGGEGRATOR_TAG, aggregatorTags);
                 }
+                isFirstTime=false;
             }
             //Add the individual messages as attachments
             aggregatedMessage.getAttachment().addItem(message);
@@ -257,7 +245,7 @@
      */
     private boolean isTimedOut(Message message) 
     {
-        long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP);
+        long splitterTimeStamp = (Long) message.getProperties().getProperty(SPLITTER_TIME_STAMP);
         if (timeoutInMillies!=null) {
             long now = new Date().getTime();
             long expiration = splitterTimeStamp + Long.valueOf(timeoutInMillies);

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-04-26 15:43:52 UTC (rev 11360)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-04-26 15:58:18 UTC (rev 11361)
@@ -23,6 +23,7 @@
 
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -75,30 +76,27 @@
      * @param boolean - isSpitter, if true will Tag the messages for
      *            aggregation purposes.
 	 */
-	public synchronized static void deliverMessages(
+    @SuppressWarnings("unchecked")
+    public synchronized static void deliverMessages(
 			Collection<String[]> destinations, Message message) throws MessageRouterException
 	{
         String uuId = UUID.randomUUID().toString();
         int counter=0;
-        boolean aggregatorTagIsFound=true;
-        int tagCounter=0;
-        while (aggregatorTagIsFound) {
-            if (message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter)==null){
-                aggregatorTagIsFound=false;
-                if (tagCounter>0) tagCounter--;
-            } else {
-                tagCounter++;
-            }
-        }
+      
+       
 		for (Iterator<String[]> i = destinations.iterator(); i.hasNext();)
 		{
             //Only put tags on when routing to more then 1 destination
             if (destinations.size()>1) {
-                message.getProperties().setProperty(AGGEGRATOR_TAG + tagCounter, uuId + ":" + ++counter + ":" + destinations.size());
-                message.getProperties().setProperty(SPLITTER_TIME_STAMP + tagCounter, new java.util.Date().getTime());
+                ArrayList<String> aggregatorTags = (ArrayList<String>) message.getProperties().getProperty(AGGEGRATOR_TAG);
+                if (aggregatorTags==null) {
+                    aggregatorTags = new ArrayList<String>();
+                }
+                String tag = uuId + ":" + ++counter + ":" + destinations.size() + ":" + new java.util.Date().getTime();
+                aggregatorTags.add(tag);
+                message.getProperties().setProperty(AGGEGRATOR_TAG, aggregatorTags);
                 if (logger.isDebugEnabled()) {
-                    String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG + tagCounter);
-                    logger.debug(AGGEGRATOR_TAG+ tagCounter + "=" + tag);
+                    logger.debug(AGGEGRATOR_TAG+ "=" + tag);
                 }
             }
          




More information about the jboss-svn-commits mailing list