[jboss-svn-commits] JBL Code SVN: r37926 - in labs/jbossesb/trunk/product: samples/quickstarts/streaming_aggregator_smooks and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Mar 22 14:23:47 EDT 2012


Author: tcunning
Date: 2012-03-22 14:23:44 -0400 (Thu, 22 Mar 2012)
New Revision: 37926

Added:
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/build.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/deployment.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/hornetq-jms.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbm-queue-service.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbmq-queue-service.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jboss-esb-unfiltered.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbossesb-properties.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jndi.properties
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/lib/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/log4j.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/mytestfile.dat
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/readme.txt
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/smooks-config.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/src/
Modified:
   labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/FragmentRouter.java
   labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/AbstractStreamSplitter.java
   labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java
Log:
JBESB-3746
Add aggregation details to fragments being routed by the fragment router.
The UUID and the message count are grabbed through the executionContext.   
Support for this has also been added into the FileStreamSplitter, which 
will populate the executionContext with these details.

These must be used with the new StreamingAggregator (not the old Aggregator)
because the StreamingAggregator is able to store messages without the final
sequence count - which is unavailable until Smooks finishes splitting the
source.

Also added a streaming_aggregator_smooks quickstart, which demonstrates this
functionality by splitting a file and then aggregating it.


Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/build.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/build.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/build.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,44 @@
+<project name="Quickstart_streaming_aggregator_smooks" default="run" basedir=".">
+	<description> 
+		${ant.project.name}
+		${line.separator}
+	</description>
+
+        <property name="additional.deploys" value="smooks-config.xml" />
+	
+	<import file="../conf/base-build.xml"/>
+  
+	<property name="jbossesb.name" value="jboss-esb.xml"/>
+	<property name="jbossesb.rootdir" location="${build.dir}/dirs"/>
+	<property name="jbossesb.inputdir" location="${jbossesb.rootdir}/input"/>
+	<property name="jbossesb.outputdir" location="${jbossesb.rootdir}/output"/>
+	<property name="jbossesb.errordir" location="${jbossesb.rootdir}/error"/>
+
+	<target name="runtest" depends="compile"
+		description="will create a testfile which in turn will trigger the ESB">
+		<echo>Runs Test File Creator</echo>
+		<copy file="mytestfile.dat" todir="${jbossesb.inputdir}"/>
+	</target>  
+	
+	<target name="config">
+			<delete dir="${jbossesb.rootdir}" quiet="true"/>
+			<mkdir dir="${jbossesb.rootdir}"/>
+			<mkdir dir="${jbossesb.inputdir}"/>
+			<mkdir dir="${jbossesb.outputdir}"/>
+			<mkdir dir="${jbossesb.errordir}"/>
+		
+			<antcall target="filter_jboss-esb.xml"/>
+			<copy file="log4j.xml" tofile="build/log4j.xml"/>
+		</target>
+	
+	<target name="filter_jboss-esb.xml">
+		<copy file="jboss-esb-unfiltered.xml" tofile="${basedir}/${jbossesb.name}" overwrite="true" filtering="true">
+				<filterset>
+					<filter token="INPUTDIR" value="${jbossesb.inputdir}"/>
+					<filter token="OUTPUTDIR" value="${jbossesb.outputdir}"/>
+					<filter token="ERRORDIR" value="${jbossesb.errordir}"/>
+			</filterset>
+		</copy>
+	</target>
+	
+</project>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/deployment.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/deployment.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/deployment.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,5 @@
+<?xml version="1.0"?>
+<jbossesb-deployment>
+  <jmsQueue>quickstart_streaming_aggregator_smooks_Request_esb</jmsQueue>
+  <jmsQueue>quickstart_streaming_aggregator_smooks_Request_gw</jmsQueue>
+</jbossesb-deployment>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/hornetq-jms.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/hornetq-jms.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/hornetq-jms.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,9 @@
+<?xml version="1.0"?>
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+  <queue name="quickstart_streaming_aggregator_smooks_Request_esb">
+    <entry name="queue/quickstart_streaming_aggregator_smooks_Request_esb"/>
+  </queue>
+  <queue name="quickstart_streaming_aggregator_smooks_Request_gw">
+    <entry name="queue/quickstart_streaming_aggregator_smooks_Request_gw"/>
+  </queue>
+</configuration>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbm-queue-service.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbm-queue-service.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbm-queue-service.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<server>
+  <mbean code="org.jboss.jms.server.destination.QueueService"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_smooks_Request_esb"
+    xmbean-dd="xmdesc/Queue-xmbean.xml">
+	<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+	<depends>jboss.messaging:service=PostOffice</depends>
+  </mbean>
+  <mbean code="org.jboss.jms.server.destination.QueueService"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_smooks_Request_gw"
+    xmbean-dd="xmdesc/Queue-xmbean.xml">
+    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+	<depends>jboss.messaging:service=PostOffice</depends>
+  </mbean>
+</server>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbmq-queue-service.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbmq-queue-service.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbmq-queue-service.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<server>
+  <mbean code="org.jboss.mq.server.jmx.Queue"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_smooks_Request_esb">
+    <depends optional-attribute-name="DestinationManager">
+      jboss.mq:service=DestinationManager
+    </depends>
+  </mbean>
+  <mbean code="org.jboss.mq.server.jmx.Queue"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_smooks_Request_gw">
+    <depends optional-attribute-name="DestinationManager">
+      jboss.mq:service=DestinationManager
+    </depends>
+  </mbean>
+</server>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jboss-esb-unfiltered.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jboss-esb-unfiltered.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jboss-esb-unfiltered.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,75 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd http://anonsvn.jboss.org/repos/labs/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
+    parameterReloadSecs="5" >
+
+    <providers>
+          <fs-provider name="FSprovider1">
+          	<fs-bus busid="helloFileChannel" >
+          		<fs-message-filter
+          			directory="@INPUTDIR@"
+          			input-suffix=".dat"
+          			work-suffix=".esbWorking"
+          			post-delete="false"
+          			post-directory="@OUTPUTDIR@"
+          			post-suffix=".sentToEsb"
+          			error-delete="false"
+          			error-directory="@ERRORDIR@"
+          			error-suffix=".IN_ERROR"
+          		/>
+          	</fs-bus>
+          </fs-provider>
+          
+          <jms-provider name="JBossMQ" connection-factory="ConnectionFactory">
+              <jms-bus busid="quickstartGwChannel">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/quickstart_streaming_aggregator_smooks_Request_gw"
+                   />
+              </jms-bus>
+              <jms-bus busid="quickstartEsbChannel">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/quickstart_streaming_aggregator_smooks_Request_esb"
+                  />
+              </jms-bus>
+	 </jms-provider>
+      </providers>
+<services>
+   <service category="streaming_agg_smooks" name="SplitterService" description="Splits out the order items and routes them.">
+     <listeners>
+       <fs-listener name="FileGateway" busidref="helloFileChannel" is-gateway="true" poll-frequency-seconds="2">
+         <property name="composer-class" value="org.jboss.soa.esb.smooks.splitting.FileStreamSplitter" />
+         <property name="splitterConfig" value="/smooks-config.xml" />
+       </fs-listener>
+       <jms-listener busidref="quickstartGwChannel" name="JMSTransformListener_esb1" />
+     </listeners>
+     <actions mep="OneWay">
+       <action name="print" class="org.jboss.soa.esb.actions.SystemPrintln">
+         <property name="printfull" value="false"/>
+         <property name="message" value="[FileSteamSplitter]"/>
+       </action>
+       <action name="routeAction" class="org.jboss.soa.esb.actions.StaticRouter">
+         <property name="destinations">
+           <route-to service-category="streaming_agg_smooks" service-name="DestinationService"/>
+         </property>  
+       </action>
+     </actions>
+   </service>
+   <service category="streaming_agg_smooks" name="DestinationService" description="Routes the message">
+     <listeners>
+       <jms-listener busidref="quickstartEsbChannel" name="JMSTransformListener_esb2" />
+     </listeners>
+     <actions mep="OneWay">
+       <action class="org.jboss.soa.esb.actions.aggregator.StreamingAggregator" name="aggregateStreamedMessages">
+         <property name="timeoutInMillis" value="1000000"/>
+       </action>
+       <action name="print" class="org.jboss.soa.esb.actions.SystemPrintln">
+         <property name="printfull" value="false" />
+         <property name="message" value="[Aggregation Results]" />
+       </action>
+     </actions>
+   </service>
+</services>     
+</jbossesb>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbossesb-properties.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbossesb-properties.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jbossesb-properties.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  JBoss, Home of Professional Open Source
+  Copyright 2006, JBoss Inc., and others contributors as indicated 
+  by the @authors tag. All rights reserved. 
+  See the copyright.txt in the distribution for a
+  full listing of individual contributors. 
+  This copyrighted material is made available to anyone wishing to use,
+  modify, copy, or redistribute it subject to the terms and conditions
+  of the GNU Lesser General Public License, v. 2.1.
+  This program is distributed in the hope that it will be useful, but WITHOUT A 
+  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+  PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+  You should have received a copy of the GNU Lesser General Public License,
+  v.2.1 along with this distribution; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+  MA  02110-1301, USA.
+  
+  (C) 2005-2006,
+  @author JBoss Inc.
+-->
+<!-- $Id: jbossesb-unittest-properties.xml $ -->
+<!--
+  These options are described in the JBossESB manual.
+  Defaults are provided here for convenience only.
+ 
+  Please read through this file prior to using the system, and consider
+  updating the specified entries.
+-->
+<esb
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:noNamespaceSchemaLocation="jbossesb-1_0.xsd">
+    <properties name="core">
+		<property name="org.jboss.soa.esb.jndi.server.type" value="jboss"/>
+		<property name="org.jboss.soa.esb.jndi.server.url" value="localhost"/>
+		<property name="org.jboss.soa.esb.persistence.connection.factory" 	value="org.jboss.internal.soa.esb.persistence.format.MessageStoreFactoryImpl"/>
+        <property name="jboss.esb.invm.scope.default" value="NONE"/>
+    </properties>
+    <properties name="registry">      
+    	<property name="org.jboss.soa.esb.registry.queryManagerURI" value="org.apache.juddi.v3.client.transport.wrapper.UDDIInquiryService#inquire"/>
+    	<property name="org.jboss.soa.esb.registry.lifeCycleManagerURI" value="org.apache.juddi.v3.client.transport.wrapper.UDDIPublicationService#publish"/>
+    	<property name="org.jboss.soa.esb.registry.securityManagerURI" value="org.apache.juddi.v3.client.transport.wrapper.UDDISecurityService#secure"/>
+    	<property name="org.jboss.soa.esb.registry.implementationClass" value="org.jboss.internal.soa.esb.services.registry.JAXRRegistryImpl"/>
+    	<property name="org.jboss.soa.esb.registry.factoryClass" value="org.apache.ws.scout.registry.ConnectionFactoryImpl"/>
+    	<property name="org.jboss.soa.esb.registry.user" value="root"/>
+    	<property name="org.jboss.soa.esb.registry.password" value="root"/>
+    	<!-- the following parameter is scout specific to set the type of communication between scout and the UDDI (embedded, rmi, soap) -->
+    	<property name="org.jboss.soa.esb.scout.proxy.transportClass" value="org.apache.ws.scout.transport.LocalTransport"/>
+    	<property name="org.jboss.soa.esb.scout.proxy.uddiVersion" value="3.0"/>
+    	<property name="org.jboss.soa.esb.scout.proxy.uddiNameSpace" value="urn:uddi-org:api_v3"/>
+    	<!-- Organization Category to be used by this deployment. -->
+        <property name="org.jboss.soa.esb.registry.orgCategory" value="org.jboss.soa.esb.:category"/>
+    </properties>
+    <properties name="transports" depends="core">
+    	<property name="org.jboss.soa.esb.mail.smtp.host" value="localhost"/>
+    	<property name="org.jboss.soa.esb.mail.smtp.user" value="jbossesb"/>
+    	<property name="org.jboss.soa.esb.mail.smtp.password" value=""/>
+    	<property name="org.jboss.soa.esb.mail.smtp.port" value="25"/>
+    </properties>
+    <properties name="connection">
+    	<property name="min-pool-size" value="5"/>
+    	<property name="max-pool-size" value="10"/>
+    	<property name="blocking-timeout-millis" value="5000"/>
+    	<property name="abandoned-connection-timeout" value="10000"/>
+    	<property name="abandoned-connection-time-interval" value="30000"/>
+    </properties>
+    <properties name="dbstore">
+		<property name="org.jboss.soa.esb.persistence.db.connection.url" 	value="jdbc:hsqldb:hsql://localhost:9001/"/>
+		<property name="org.jboss.soa.esb.persistence.db.jdbc.driver" 		value="org.hsqldb.jdbcDriver"/>
+		<property name="org.jboss.soa.esb.persistence.db.user" 			value="sa"/>
+		<property name="org.jboss.soa.esb.persistence.db.pwd" 			value=""/>		
+		<property name="org.jboss.soa.esb.persistence.db.pool.initial.size"	value="2"/>
+		<property name="org.jboss.soa.esb.persistence.db.pool.min.size"	value="2"/>
+		<property name="org.jboss.soa.esb.persistence.db.pool.max.size"	value="5"/>
+		<!--table managed by pool to test for valid connections - created by pool automatically -->
+		<property name="org.jboss.soa.esb.persistence.db.pool.test.table"	value="pooltest"/>
+		<!-- # of milliseconds to timeout waiting for a connection from pool -->
+		<property name="org.jboss.soa.esb.persistence.db.pool.timeout.millis"	value="5000"/> 
+                <property name="org.jboss.soa.esb.persistence.db.conn.manager" value="org.jboss.internal.soa.esb.persistence.manager.StandaloneConnectionManager"/>
+    </properties>
+    <properties name="messagerouting">
+    	<property name="org.jboss.soa.esb.routing.cbrClass" value="org.jboss.internal.soa.esb.services.routing.cbr.JBossRulesRouter"/>
+    </properties>
+</esb>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jndi.properties
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jndi.properties	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/jndi.properties	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,5 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming
+java.naming.factory.url.pkgs=org.jnp.interfaces
+

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/log4j.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/log4j.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/log4j.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- ===================================================================== -->
+<!--                                                                       -->
+<!--  Log4j Configuration                                                  -->
+<!--                                                                       -->
+<!-- ===================================================================== -->
+
+<!-- $Id: log4j.xml,v 1.26.2.5 2005/09/15 09:31:02 dimitris Exp $ -->
+
+<!--
+   | For more configuration infromation and examples see the Jakarta Log4j
+   | owebsite: http://jakarta.apache.org/log4j
+ -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+   <!-- ============================== -->
+   <!-- Append messages to the console -->
+   <!-- ============================== -->
+
+   <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+      <errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
+      <param name="Target" value="System.out"/>
+
+      <layout class="org.apache.log4j.PatternLayout">
+         <!-- The default pattern: Date Priority [Category] Message\n -->
+         <param name="ConversionPattern" value="%d{ABSOLUTE} %-5p [%t][%c{1}] %m%n"/>
+      </layout>
+   </appender>
+
+   <!-- ================================= -->
+   <!-- Preserve messages in a local file -->
+   <!-- ================================= -->
+
+   <!-- A size based file rolling appender -->
+   <appender name="FILE" class="org.jboss.logging.appender.RollingFileAppender">
+     <errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
+     <param name="File" value="./listener.log"/>
+     <param name="Append" value="false"/>
+     <param name="MaxFileSize" value="500KB"/>
+     <param name="MaxBackupIndex" value="1"/>
+
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d %-5p [%t][%c] %m%n"/>
+     </layout>	    
+   </appender>
+
+   <!-- ================ -->
+   <!-- Limit categories -->
+   <!-- ================ -->
+
+   <category name="org.jboss">
+      <priority value="WARN"/>
+   </category>
+   <category name="org.jboss.soa.esb">
+      <priority value="ERROR"/>
+   </category>
+   <category name="org.jboss.internal.soa.esb">
+      <priority value="ERROR"/>
+   </category>
+   <category name="org.apache">
+      <priority value="ERROR"/>
+   </category>
+   <category name="quickstart">
+      <priority value="INFO"/>
+   </category>
+   <!-- ======================= -->
+   <!-- Setup the Root category -->
+   <!-- ======================= -->
+
+   <root>
+      <appender-ref ref="CONSOLE"/>
+      <appender-ref ref="FILE"/>
+   </root>
+
+</log4j:configuration>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/mytestfile.dat
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/mytestfile.dat	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/mytestfile.dat	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,6 @@
+foo
+bar
+foo
+bar
+foo
+bar

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/readme.txt
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/readme.txt	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/readme.txt	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,45 @@
+Overview:
+=========
+  This is a basic example of using the File gateway feature of the JBoss ESB.
+  Files that are found in a particular directory with a particular extension
+  are sent to a JMS queue with actions for processing.
+
+Running this quickstart:
+========================
+  Please refer to 'ant help-quickstarts' for prerequisites about the quickstarts
+  and a more detailed descripton of the different ways to run the quickstarts.
+
+To Run:
+===========================
+  1. In a command terminal window in this folder ("Window1"), type 'ant deploy'.
+  2. Open another command terminal window in this folder ("Window2"), type
+     'ant runtest'.
+  3. Switch back to Application Server console to see the output from the ESB
+  4. In this folder ("Window1"), type 'ant undeploy'.
+
+What to look at in this Quickstart:
+===================================
+  This example demonstrates the use of a file gateway that by default loads the
+  file and pushes into a JMS message queue. What follows is a more detailed
+  discussion on the file gateway:
+
+  * directory - the directory to be monitored for input file messages
+  * input-suffix - the file extension to be monitored, other files will be
+    ignored
+  * work-suffix - the file extension that is used while the file is "in process"
+    by the ESB.  The file is considered to be "in process" while the gateway is
+    passing it into the ESB listener/service (in this case JMS queue).
+  * post-delete - "true" or "false". The file can be removed once has been
+    successfully processed.
+  * post-directory - The place where the "processed" file ends up assuming no
+    errors and assuming post-delete="false"
+  * post-suffix - The file extension that is used to mark the file as
+    "completed".
+  * error-delete - "true" or "false". If there is an internal error and the file
+    fails to be loaded by the ESB, delete it.
+  * error-directory - The place to drop any file that fails to be
+    uploaded/processed.
+  * error-suffix - The file extension that is used to mark a file has had an
+    internal error.
+    Note: Error processing in this case means the file failed to pass through
+    the gateway and into the waiting queue.

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/smooks-config.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/smooks-config.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator_smooks/smooks-config.xml	2012-03-22 18:23:44 UTC (rev 37926)
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+ <smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
+ xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
+ xmlns:file="http://www.milyn.org/xsd/smooks/file-routing-1.1.xsd"
+ xmlns:esbr="http://www.jboss.org/xsd/jbossesb/smooks/routing-1.0.xsd" 
+ xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd"
+ xmlns:jb="http://www.milyn.org/xsd/smooks/javabean-1.1.xsd">
+ 
+ <params>
+   <param name="stream.filter.type">SAX</param>
+ </params>
+ 
+ <csv:reader fields="value"/>
+ 
+ <jb:bindings beanId="record" class="java.util.HashMap" createOnElement="csv-record">
+   <jb:value property="line" data="csv-set/csv-record/value" />
+ </jb:bindings>
+ 
+ <ftl:freemarker applyOnElement="csv-record">
+   <ftl:template> <!--${record.line}--></ftl:template>
+   <ftl:use>
+     <ftl:bindTo id="lineRecord" />
+   </ftl:use>
+ </ftl:freemarker>
+ 
+ <!-- Route the line to another service -->
+ <resource-config selector="csv-record">
+   <resource>org.jboss.soa.esb.smooks.FragmentRouter</resource>
+   <param name="beanId">lineRecord</param>
+   <param name="serviceCategory">streaming_agg_smooks</param>
+   <param name="serviceName">DestinationService</param>
+ </resource-config>
+ </smooks-resource-list>

Modified: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/FragmentRouter.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/FragmentRouter.java	2012-03-22 05:29:11 UTC (rev 37925)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/FragmentRouter.java	2012-03-22 18:23:44 UTC (rev 37926)
@@ -20,6 +20,7 @@
 package org.jboss.soa.esb.smooks;
 
 import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.aggregator.AggregateDetails;
 import org.jboss.soa.esb.client.ServiceInvoker;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Body;
@@ -99,6 +100,17 @@
             // Try the exec context...
             object = executionContext.getAttribute(beanId);
         }
+                
+        int currentRecord = 0;
+        Integer currentCount = (Integer) executionContext.getAttribute(AggregateDetails.SEQUENCE_COUNT);
+        if (currentCount != null) {
+        	try {
+        		currentRecord = currentCount.intValue();
+        	} catch (NumberFormatException nfe) {
+        		currentRecord = 0;
+        	}
+        }
+        String seriesUUID = (String) executionContext.getAttribute(AggregateDetails.SERIES_UUID);
 
         if(object != null) {
             Message message = MessageFactory.getInstance().getMessage();
@@ -108,10 +120,13 @@
             }
 
             message.getBody().add(setPayloadLocation, object);
-
-            // TODO: Add aggregation stuff...
-            // Might be better to add something generic ala setting properties based on templates
-
+            
+            // Set aggregation details for streaming aggregator
+            currentRecord++;
+            message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
+            		new AggregateDetails(seriesUUID, Integer.valueOf(currentRecord))) ;
+            executionContext.setAttribute(AggregateDetails.SEQUENCE_COUNT, Integer.valueOf(currentRecord));
+            
             try {
                 serviceInvoker.deliverAsync(message);
             } catch (MessageDeliverException e) {

Modified: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/AbstractStreamSplitter.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/AbstractStreamSplitter.java	2012-03-22 05:29:11 UTC (rev 37925)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/AbstractStreamSplitter.java	2012-03-22 18:23:44 UTC (rev 37926)
@@ -42,7 +42,7 @@
  */
 public abstract class AbstractStreamSplitter implements Configurable {
 
-    private Smooks smooks;
+    protected Smooks smooks;
     private String encoding;
     private String reportPath;
 
@@ -62,12 +62,23 @@
         Source streamSource = new StreamSource(new InputStreamReader(dataStream, encoding));
 
         if(reportPath != null) {
-            ExecutionContext execContext = smooks.createExecutionContext();
-
+            ExecutionContext execContext = smooks.createExecutionContext();            
             execContext.setEventListener(new HtmlReportGenerator(reportPath));
             smooks.filterSource(execContext, streamSource);
         } else {
             smooks.filterSource(streamSource);            
         }
     }
+    
+    public void split(InputStream dataStream, ExecutionContext execContext) throws IOException {
+        Source streamSource = new StreamSource(new InputStreamReader(dataStream, encoding));
+
+        if(reportPath != null) {           
+            execContext.setEventListener(new HtmlReportGenerator(reportPath));
+            smooks.filterSource(execContext, streamSource);
+        } else {
+            smooks.filterSource(execContext, streamSource);            
+        }
+    }
+    
 }
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java
===================================================================
--- labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java	2012-03-22 05:29:11 UTC (rev 37925)
+++ labs/jbossesb/trunk/product/services/smooks/src/main/java/org/jboss/soa/esb/smooks/splitting/FileStreamSplitter.java	2012-03-22 18:23:44 UTC (rev 37926)
@@ -19,15 +19,19 @@
  */
 package org.jboss.soa.esb.smooks.splitting;
 
+import org.jboss.soa.esb.actions.aggregator.AggregateDetails;
 import org.jboss.soa.esb.listeners.message.MessageComposer;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.format.MessageFactory;
 import org.jboss.internal.soa.esb.assertion.AssertArgument;
 
+import org.milyn.container.ExecutionContext;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.FileInputStream;
+import java.util.UUID;
 
 /**
  * File stream splitting message composer.
@@ -47,16 +51,31 @@
             throw new MessageDeliverException("Invalid File payload.  File '" + inputFile.getAbsolutePath() + "' doesn't exist.");
         }
 
+        ExecutionContext execContext = smooks.createExecutionContext();            
+        final String seriesUUID = UUID.randomUUID().toString();
+        execContext.setAttribute(AggregateDetails.SERIES_UUID, seriesUUID);
+        
         // Split the file input stream...
         try {
-            split(new FileInputStream(inputFile));
+            split(new FileInputStream(inputFile), execContext);
         } catch (IOException e) {
             throw new MessageDeliverException("Exception while splitting file input stream for file '" + inputFile.getAbsolutePath() + "'.", e);
         }
 
         Message message = MessageFactory.getInstance().getMessage();
         message.getBody().add(inputFile.getAbsolutePath());
-        
+            
+        // If the executionContext contains a sequence count, add it to the message
+        if (execContext.getAttribute(AggregateDetails.SEQUENCE_COUNT) != null) {
+        	Integer sequenceCount = (Integer) execContext.getAttribute(AggregateDetails.SEQUENCE_COUNT);            
+   
+        	if (sequenceCount.intValue() > 0) {
+        		AggregateDetails aggDetails = new AggregateDetails(seriesUUID, new Integer(0));
+        		aggDetails.setSequenceCount(sequenceCount);
+        		message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
+        				aggDetails);
+        	}
+        }        
         return message;
     }
 



More information about the jboss-svn-commits mailing list