[jboss-svn-commits] JBL Code SVN: r38385 - in labs/jbossesb/trunk/product: rosetta/tests/src/org/jboss/soa/esb/actions and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Jul 18 10:39:18 EDT 2013
Author: tcunning
Date: 2013-07-18 10:39:18 -0400 (Thu, 18 Jul 2013)
New Revision: 38385
Added:
labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/
labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java
labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java
labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java
labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java
Log:
JBESB-3926
Check in Kevin's fix for FileStreamSplitter + tests.
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java 2013-07-17 16:18:34 UTC (rev 38384)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java 2013-07-18 14:39:18 UTC (rev 38385)
@@ -182,11 +182,15 @@
previousEntry.setNextEntry(entry) ;
}
}
+ entry.lock() ;
if (!entry.hasExpired())
{
- entry.lock() ;
return entry ;
}
+ else
+ {
+ entry.unlock() ;
+ }
}
}
@@ -235,7 +239,7 @@
private int aggregateCount ;
private Message message ;
- private Lock lock = new ReentrantLock() ;
+ private final Lock lock = new ReentrantLock() ;
private AggregatorEntry previousEntry ;
private AggregatorEntry nextEntry ;
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java 2013-07-17 16:18:34 UTC (rev 38384)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java 2013-07-18 14:39:18 UTC (rev 38385)
@@ -27,21 +27,15 @@
import static org.junit.Assert.assertTrue;
import java.io.InputStream;
-import java.net.URI;
import java.util.ArrayList;
import java.util.UUID;
import junit.framework.JUnit4TestAdapter;
import org.apache.log4j.Logger;
-import org.jboss.internal.soa.esb.couriers.MockCourier;
-import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
-import org.jboss.internal.soa.esb.services.registry.MockRegistry;
-import org.jboss.soa.esb.actions.aggregator.AbstractAggregator;
import org.jboss.soa.esb.actions.aggregator.StreamingAggregator;
import org.jboss.soa.esb.actions.aggregator.AggregateDetails;
-import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
@@ -63,13 +57,6 @@
static Logger logger = Logger.getLogger(StreamingAggregatorUnitTest.class);
static Message message = null;
- private static EPR epr1;
- private static EPR epr2;
- private static EPR epr3;
- private static MockCourier courier1;
- private static MockCourier courier2;
- private static MockCourier courier3;
-
private static ConfigTree[] actions;
public static junit.framework.Test suite()
@@ -79,20 +66,6 @@
@BeforeClass
public static void before() throws Exception {
- MockCourierFactory.install();
- MockRegistry.install();
-
- epr1 = new EPR(new URI("test1"));
- epr2 = new EPR(new URI("test2"));
- epr3 = new EPR(new URI("DLS"));
- courier1 = new MockCourier(true);
- courier2 = new MockCourier(true);
- courier3 = new MockCourier(true);
-
- MockRegistry.register("test", "java", epr1, courier1);
- MockRegistry.register("test", "xml", epr2, courier2);
- MockRegistry.register("test", "aggregator", epr3, courier3);
-
message = MessageFactory.getInstance().getMessage(MessageType.JAVA_SERIALIZED);
message.getBody().add(("Hello Aggregator"));
@@ -188,4 +161,48 @@
}
}
+ /**
+ * This test exercises the out of sequence delivery of the final message, i.e.
+ * the message which contains the sequence count as created by the Smooks
+ * FileStreamSplitter.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void aggregateStreamingTest()
+ throws Exception
+ {
+ final StreamingAggregator aggregator = new StreamingAggregator(actions[0]);
+ aggregator.initialise();
+ final int recipientCount = 3; // three messages plus the final message containing the sequence count
+ for(int posn = 0 ; posn <= recipientCount; posn++)
+ {
+ int count = 1;
+ final String uuId = UUID.randomUUID().toString();
+
+ for(int idx = 0; idx <= recipientCount; idx++)
+ {
+ if (idx != posn)
+ {
+ message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS,
+ new AggregateDetails(uuId, count++));
+ }
+ else
+ {
+ message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS,
+ new AggregateDetails(uuId, 0));
+ ((AggregateDetails)message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS)).setSequenceCount(Integer.valueOf(recipientCount+1));
+ }
+ final Message responseMessage = aggregator.process(message);
+ if (idx < recipientCount)
+ {
+ assertNull("Message returned early", responseMessage);
+ }
+ else
+ {
+ assertNotNull("No aggregated message returned", responseMessage);
+ }
+ }
+ }
+ }
}
\ No newline at end of file
Modified: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java 2013-07-17 16:18:34 UTC (rev 38384)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java 2013-07-18 14:39:18 UTC (rev 38385)
@@ -71,7 +71,7 @@
if (sequenceCount.intValue() > 0) {
AggregateDetails aggDetails = new AggregateDetails(seriesUUID, new Integer(0));
- aggDetails.setSequenceCount(sequenceCount);
+ aggDetails.setSequenceCount(sequenceCount+1);
message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS,
aggDetails);
}
Added: labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/AggregationTestCourier.java 2013-07-18 14:39:18 UTC (rev 38385)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregation;
+
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.internal.soa.esb.message.format.xml.MessageImpl;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.util.Util;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+*/
+class AggregationTestCourier extends MockCourier {
+
+ public List<Message> messages = new ArrayList<Message>();
+
+ public AggregationTestCourier() {
+ super(true);
+ }
+
+ public static MessageImpl msgFromXML(final String xmlRepresentation)
+ throws Exception
+ {
+ return (MessageImpl) Util.deserialize(xmlRepresentation);
+ }
+
+ public static String msgToXML(final MessageImpl msg)
+ throws Exception
+ {
+ return (String) Util.serialize(msg);
+ }
+
+ public boolean deliver(Message message) throws CourierException, MalformedEPRException {
+ try {
+ String xmlRepresentation = msgToXML((MessageImpl)message);
+ messages.add(msgFromXML(xmlRepresentation));
+ return true;
+ } catch(Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ return false;
+ }
+}
Added: labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/FileStreamSplitterAggregationUnitTest.java 2013-07-18 14:39:18 UTC (rev 38385)
@@ -0,0 +1,140 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregation;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.actions.aggregator.StreamingAggregator;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.smooks.splitting.FileStreamSplitter;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStreamSplitterAggregationUnitTest
+{
+ private AggregationTestCourier testCourier = new AggregationTestCourier() ;
+
+ @Before
+ public void setUp()
+ throws Exception
+ {
+ MockCourierFactory.install() ;
+ MockRegistry.install() ;
+
+ MockRegistry.register("test", "aggregator", testCourier) ;
+ }
+
+ @Test
+ public void testStreamSplitAggregation()
+ throws Exception
+ {
+ final int numEntries = 6 ;
+ final File inputFile = createTestFile(numEntries) ;
+ try
+ {
+ final FileStreamSplitter<File> splitter = new FileStreamSplitter<File>() ;
+ final ConfigTree config = new ConfigTree("test") ;
+ config.setAttribute("splitterConfig", getClass().getPackage().getName().replace('.', '/') + "/smooks-config.xml") ;
+ splitter.setConfiguration(config) ;
+
+ final Message lastMessage = splitter.compose(inputFile) ;
+
+ final List<Message> messages = testCourier.messages ;
+ messages.add(0, lastMessage) ;
+
+ final int numMessages = messages.size() ;
+ assertEquals("Expected message count", numEntries + 1, numMessages) ;
+
+ final StreamingAggregator aggregator = new StreamingAggregator(new ConfigTree("aggregator")) ;
+ aggregator.initialise() ;
+
+ for(int count = 0 ; count <= numEntries ; count++)
+ {
+ final Message result = aggregator.process(messages.get(count)) ;
+ if (count < numEntries)
+ {
+ assertNull("Message returned early", result) ;
+ }
+ else
+ {
+ assertNotNull("No aggregated message returned", result) ;
+ }
+ }
+ }
+ finally
+ {
+ inputFile.delete() ;
+ }
+ }
+
+ private File createTestFile(final int numEntries)
+ throws Exception
+ {
+ final File file = File.createTempFile("stream", "test") ;
+ final PrintWriter pw = new PrintWriter(file) ;
+ try
+ {
+ for(int count = 0 ; count < numEntries ; count++)
+ {
+ pw.println("row" + count) ;
+ }
+ }
+ finally
+ {
+ pw.close() ;
+ }
+ return file ;
+ }
+ @After
+ public void tearDown()
+ throws Exception
+ {
+ testCourier.messages.clear() ;
+ MockRegistry.uninstall();
+ MockCourierFactory.uninstall();
+ }
+
+ private void writeToFile(final String content, final File to)
+ throws IOException
+ {
+ final FileWriter writer = new FileWriter(to) ;
+ writer.write(content) ;
+ writer.close() ;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(FileStreamSplitterAggregationUnitTest.class);
+ }
+}
Added: labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml (rev 0)
+++ labs/jbossesb/trunk/product/services/smooks/src/test/java/org/jboss/soa/esb/actions/aggregation/smooks-config.xml 2013-07-18 14:39:18 UTC (rev 38385)
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
+ xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
+ xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd"
+ xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.1.xsd">
+
+ <params>
+ <param name="stream.filter.type">SAX</param>
+ </params>
+
+ <csv:reader fields="value"/>
+
+ <jb:bindings beanId="record" class="java.util.HashMap" createOnElement="csv-record">
+ <jb:value property="line" data="csv-set/csv-record/value" />
+ </jb:bindings>
+
+ <ftl:freemarker applyOnElement="csv-record">
+ <ftl:template> <!--${record.line}--></ftl:template>
+ <ftl:use>
+ <ftl:bindTo id="lineRecord" />
+ </ftl:use>
+ </ftl:freemarker>
+
+ <!-- Route the line to another service -->
+ <resource-config selector="csv-record">
+ <resource>org.jboss.soa.esb.smooks.FragmentRouter</resource>
+ <param name="beanId">lineRecord</param>
+ <param name="serviceCategory">test</param>
+ <param name="serviceName">aggregator</param>
+ </resource-config>
+ </smooks-resource-list>
+
\ No newline at end of file
More information about the jboss-svn-commits
mailing list