[jboss-svn-commits] JBL Code SVN: r38385 - in labs/jbossesb/trunk/product: rosetta/tests/src/org/jboss/soa/esb/actions and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Jul 18 10:39:18 EDT 2013


Author: tcunning
Date: 2013-07-18 10:39:18 -0400 (Thu, 18 Jul 2013)
New Revision: 38385

Added:
   labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/
   labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java
   labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java
   labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml
Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java
   labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java
Log:
JBESB-3926
Check in Kevin's fix for FileStreamSplitter + tests.


Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java	2013-07-17 16:18:34 UTC (rev 38384)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java	2013-07-18 14:39:18 UTC (rev 38385)
@@ -182,11 +182,15 @@
                     previousEntry.setNextEntry(entry) ;
                 }
             }
+            entry.lock() ;
             if (!entry.hasExpired())
             {
-                entry.lock() ;
                 return entry ;
             }
+            else
+            {
+                entry.unlock() ;
+            }
         }
     }
 
@@ -235,7 +239,7 @@
         private int aggregateCount ;
         private Message message ;
 
-        private Lock lock = new ReentrantLock() ;
+        private final Lock lock = new ReentrantLock() ;
         private AggregatorEntry previousEntry ;
         private AggregatorEntry nextEntry ;
         

Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java	2013-07-17 16:18:34 UTC (rev 38384)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java	2013-07-18 14:39:18 UTC (rev 38385)
@@ -27,21 +27,15 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.InputStream;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.UUID;
 
 import junit.framework.JUnit4TestAdapter;
 
 import org.apache.log4j.Logger;
-import org.jboss.internal.soa.esb.couriers.MockCourier;
-import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
-import org.jboss.internal.soa.esb.services.registry.MockRegistry;
 
-import org.jboss.soa.esb.actions.aggregator.AbstractAggregator;	
 import org.jboss.soa.esb.actions.aggregator.StreamingAggregator;
 import org.jboss.soa.esb.actions.aggregator.AggregateDetails;
-import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.format.MessageFactory;
@@ -63,13 +57,6 @@
 	static Logger logger = Logger.getLogger(StreamingAggregatorUnitTest.class);
     static Message message = null;
     
-    private static EPR epr1;
-    private static EPR epr2;
-    private static EPR epr3;
-    private static MockCourier courier1;
-    private static MockCourier courier2;
-    private static MockCourier courier3;
-    
     private static ConfigTree[] actions;
 	
     public static junit.framework.Test suite()
@@ -79,20 +66,6 @@
     
     @BeforeClass
     public static void before() throws Exception {
-        MockCourierFactory.install();
-        MockRegistry.install();
-
-        epr1 = new EPR(new URI("test1"));
-        epr2 = new EPR(new URI("test2"));
-        epr3 = new EPR(new URI("DLS"));
-        courier1 = new MockCourier(true);
-        courier2 = new MockCourier(true);
-        courier3 = new MockCourier(true);
-
-        MockRegistry.register("test", "java", epr1, courier1);
-        MockRegistry.register("test", "xml", epr2, courier2);
-        MockRegistry.register("test", "aggregator", epr3, courier3);
- 
         message = MessageFactory.getInstance().getMessage(MessageType.JAVA_SERIALIZED);
         message.getBody().add(("Hello Aggregator"));
         
@@ -188,4 +161,48 @@
         }
     }
     
+    /**
+     * This test exercises the out of sequence delivery of the final message, i.e.
+     * the message which contains the sequence count as created by the Smooks
+     * FileStreamSplitter.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void aggregateStreamingTest()
+        throws Exception
+    {
+        final StreamingAggregator aggregator = new StreamingAggregator(actions[0]);
+        aggregator.initialise();
+        final int recipientCount = 3; // three messages plus the final message containing the sequence count
+        for(int posn = 0 ; posn <= recipientCount; posn++)
+        {
+            int count = 1;
+            final String uuId = UUID.randomUUID().toString();
+        
+            for(int idx = 0; idx <= recipientCount; idx++) 
+            {
+                if (idx != posn)
+                {
+                    message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
+                            new AggregateDetails(uuId, count++));
+                }
+                else
+                {
+                    message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
+                            new AggregateDetails(uuId, 0));
+                    ((AggregateDetails)message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS)).setSequenceCount(Integer.valueOf(recipientCount+1));
+                }
+                final Message responseMessage = aggregator.process(message);
+                if (idx < recipientCount)
+                {
+                    assertNull("Message returned early", responseMessage);
+                }
+                else
+                {
+                    assertNotNull("No aggregated message returned", responseMessage);
+                }
+            }
+        }
+    }
 }
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java	2013-07-17 16:18:34 UTC (rev 38384)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java	2013-07-18 14:39:18 UTC (rev 38385)
@@ -71,7 +71,7 @@
    
         	if (sequenceCount.intValue() > 0) {
         		AggregateDetails aggDetails = new AggregateDetails(seriesUUID, new Integer(0));
-        		aggDetails.setSequenceCount(sequenceCount);
+        		aggDetails.setSequenceCount(sequenceCount+1);
         		message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
         				aggDetails);
         	}

Added: labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java	2013-07-18 14:39:18 UTC (rev 38385)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregation;
+
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.internal.soa.esb.message.format.xml.MessageImpl;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.util.Util;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+*/
+class AggregationTestCourier extends MockCourier {
+
+    public List<Message> messages = new ArrayList<Message>();
+
+    public AggregationTestCourier() {
+        super(true);
+    }
+
+    public static MessageImpl msgFromXML(final String xmlRepresentation)
+         throws Exception
+    {
+      return (MessageImpl) Util.deserialize(xmlRepresentation);
+    }
+
+    public static String msgToXML(final MessageImpl msg)
+         throws Exception
+    {
+      return (String) Util.serialize(msg);
+    }
+
+    public boolean deliver(Message message) throws CourierException, MalformedEPRException {
+        try {
+            String xmlRepresentation = msgToXML((MessageImpl)message);
+            messages.add(msgFromXML(xmlRepresentation));
+            return true;
+        } catch(Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+        return false;
+    }
+}

Added: labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java	2013-07-18 14:39:18 UTC (rev 38385)
@@ -0,0 +1,140 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregation;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.actions.aggregator.StreamingAggregator;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.smooks.splitting.FileStreamSplitter;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStreamSplitterAggregationUnitTest
+{
+    private AggregationTestCourier testCourier = new AggregationTestCourier() ;
+
+    @Before
+    public void setUp()
+            throws Exception
+    {
+        MockCourierFactory.install() ;
+        MockRegistry.install() ;
+
+        MockRegistry.register("test", "aggregator", testCourier) ;
+    }
+
+    @Test
+    public void testStreamSplitAggregation()
+        throws Exception
+    {
+        final int numEntries = 6 ;
+        final File inputFile = createTestFile(numEntries) ;
+        try
+        {
+            final FileStreamSplitter<File> splitter = new FileStreamSplitter<File>() ;
+            final ConfigTree config = new ConfigTree("test") ;
+            config.setAttribute("splitterConfig", getClass().getPackage().getName().replace('.', '/') + "/smooks-config.xml") ;
+            splitter.setConfiguration(config) ;
+            
+            final Message lastMessage = splitter.compose(inputFile) ;
+            
+            final List<Message> messages = testCourier.messages ;
+            messages.add(0, lastMessage) ;
+            
+            final int numMessages = messages.size() ;
+            assertEquals("Expected message count", numEntries + 1, numMessages) ;
+            
+            final StreamingAggregator aggregator = new StreamingAggregator(new ConfigTree("aggregator")) ;
+            aggregator.initialise() ;
+            
+            for(int count = 0 ; count <= numEntries ; count++)
+            {
+                final Message result = aggregator.process(messages.get(count)) ;
+                if (count < numEntries)
+                {
+                    assertNull("Message returned early", result) ;
+                }
+                else
+                {
+                    assertNotNull("No aggregated message returned", result) ;
+                }
+            }
+        }
+        finally
+        {
+            inputFile.delete() ;
+        }
+    }
+    
+    private File createTestFile(final int numEntries)
+        throws Exception
+    {
+        final File file = File.createTempFile("stream", "test") ;
+        final PrintWriter pw = new PrintWriter(file) ;
+        try
+        {
+            for(int count = 0 ; count < numEntries ; count++)
+            {
+                pw.println("row" + count) ;
+            }
+        }
+        finally
+        {
+            pw.close() ;
+        }
+        return file ;
+    }
+    @After
+    public void tearDown()
+            throws Exception
+    {
+        testCourier.messages.clear() ;
+        MockRegistry.uninstall();
+        MockCourierFactory.uninstall();
+    }
+
+    private void writeToFile(final String content, final File to)
+        throws IOException
+    {
+        final FileWriter writer = new FileWriter(to) ;
+        writer.write(content) ;
+        writer.close() ;
+    }
+    
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(FileStreamSplitterAggregationUnitTest.class);
+    }
+}

Added: labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml	2013-07-18 14:39:18 UTC (rev 38385)
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
+  xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
+  xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd"
+  xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.1.xsd">
+
+  <params>
+    <param name="stream.filter.type">SAX</param>
+  </params>
+ 
+  <csv:reader fields="value"/>
+ 
+  <jb:bindings beanId="record" class="java.util.HashMap" createOnElement="csv-record">
+    <jb:value property="line" data="csv-set/csv-record/value" />
+  </jb:bindings>
+ 
+  <ftl:freemarker applyOnElement="csv-record">
+    <ftl:template> <!--${record.line}--></ftl:template>
+      <ftl:use>
+        <ftl:bindTo id="lineRecord" />
+      </ftl:use>
+  </ftl:freemarker>
+ 
+  <!-- Route the line to another service -->
+  <resource-config selector="csv-record">
+    <resource>org.jboss.soa.esb.smooks.FragmentRouter</resource>
+    <param name="beanId">lineRecord</param>
+    <param name="serviceCategory">test</param>
+    <param name="serviceName">aggregator</param>
+  </resource-config>
+ </smooks-resource-list>
+ 
\ No newline at end of file



More information about the jboss-svn-commits mailing list