[jboss-svn-commits] JBL Code SVN: r37921 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb/actions/aggregator and 12 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Mar 14 15:25:09 EDT 2012


Author: tcunning
Date: 2012-03-14 15:25:08 -0400 (Wed, 14 Mar 2012)
New Revision: 37921

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AggregateDetails.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/StreamingAggregator.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/OtherStreamingAggregatorUnitTest.xml
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregator_On_Properties_UnitTest.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/build.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/deployment.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/hornetq-jms.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbm-queue-service.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbmq-queue-service.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb-unfiltered.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbossesb-properties.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jndi.properties
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/lib/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/log4j.xml
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/readme.txt
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/IncomingComposer.java
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/test/
   labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/test/CreateTestFile.java
Log:
JBESB-3757
Add a streaming aggregator.


Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AbstractAggregator.java	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,400 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2012,
+ * @author JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregator;
+
+import java.util.HashMap ;
+import java.util.Map ;
+import java.util.concurrent.locks.Lock ;
+import java.util.concurrent.locks.ReentrantLock ;
+
+import org.apache.log4j.Logger ;
+import org.jboss.soa.esb.ConfigurationException ;
+import org.jboss.soa.esb.actions.AbstractActionPipelineProcessor ;
+import org.jboss.soa.esb.actions.ActionLifecycleException ;
+import org.jboss.soa.esb.actions.ActionProcessingException ;
+import org.jboss.soa.esb.helpers.ConfigTree ;
+import org.jboss.soa.esb.message.Message ;
+import org.jboss.soa.esb.message.format.MessageFactory ;
+
+/**
+ * Abstract action class which defines how aggregation is processed but 
+ * leaves abstract how messages are stored.
+ * 
+ * @author Kevin Conner
+ */
+public abstract class AbstractAggregator extends AbstractActionPipelineProcessor
+{
+    private static final Logger LOGGER = Logger.getLogger(AbstractAggregator.class) ;
+    
+    private static final long DEFAULT_TIMEOUT = 600000L ;
+
+    private final long timeout ;
+    
+    private TimeoutProcessor timeoutProcessor ;
+    
+    private AggregatorEntry head ;
+    private Map<String, AggregatorEntry> seriesUUIDtoEntry ;
+
+    protected AbstractAggregator(final ConfigTree config)
+        throws ConfigurationException
+    {
+        final String timeoutValue = config.getAttribute("timeoutInMillis") ;
+        if (timeoutValue != null)
+        {
+            long parsedTimeout ;
+            try
+            {
+                parsedTimeout = Long.valueOf(timeoutValue) ;
+            }
+            catch (final NumberFormatException nfe)
+            {
+                parsedTimeout = DEFAULT_TIMEOUT ;
+                throw new ConfigurationException("Failed to parse timeout " + timeoutValue) ;
+            }
+            timeout = parsedTimeout ;
+        }
+        else
+        {
+            timeout = DEFAULT_TIMEOUT ;
+        }
+    }
+    
+    @Override
+    public void initialise() throws ActionLifecycleException
+    {
+        head = new AggregatorEntry(null) ;
+        head.setNextEntry(head) ;
+        head.setPreviousEntry(head) ;
+        
+        seriesUUIDtoEntry = new HashMap<String, AggregatorEntry>() ;
+        
+        timeoutProcessor = new TimeoutProcessor() ;
+        timeoutProcessor.start() ;
+    }
+
+    @Override
+    public void destroy() throws ActionLifecycleException
+    {
+        timeoutProcessor.stop() ;
+        timeoutProcessor = null ;
+    }
+    
+    @Override
+    public Message process(Message message)
+        throws ActionProcessingException
+    {
+        final Object detailObject = message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS) ;
+        if ((detailObject == null) || !(detailObject instanceof AggregateDetails))
+        {
+            return message ;
+        }
+        
+        final AggregateDetails aggregateDetails = (AggregateDetails)detailObject ;
+        
+        final String seriesUUID = aggregateDetails.getSeriesUUID() ;
+        
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Aggregating message into " + seriesUUID) ;
+        }
+        
+        AggregatorEntry entry = retrieveEntry(seriesUUID) ;
+        try
+        {
+            Message entryMessage = entry.getMessage() ;
+            if (entryMessage == null)
+            {
+                entryMessage = MessageFactory.getInstance().getMessage() ;
+                entryMessage.getProperties().setProperty(AggregateDetails.SERIES_UUID, seriesUUID) ;
+                entry.setMessage(entryMessage) ;
+            }
+            final Integer messageSequence = aggregateDetails.getMessageSequence() ;
+            if (aggregateMessage(entryMessage, messageSequence, message))
+            {
+                final int aggregateCount = entry.incAggregateCount() ;
+                
+                final Integer sequenceCount = aggregateDetails.getSequenceCount() ;
+                if (sequenceCount != null)
+                {
+                    entry.setSequenceCount(sequenceCount.intValue()) ;
+                    entryMessage.getProperties().setProperty(AggregateDetails.SEQUENCE_COUNT, sequenceCount) ;
+                }
+                final int entrySequenceCount = entry.getSequenceCount() ;
+                if ((entrySequenceCount == 0) || (aggregateCount < entrySequenceCount))
+                {
+                    entryMessage = null ;
+                }
+                else
+                {
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Aggregated message for " + seriesUUID + ", aggregateCount " + aggregateCount) ;
+                    }
+                    entry.setMessage(null) ;
+                }
+            }
+            return entryMessage ;
+        }
+        finally
+        {
+            releaseEntry(entry) ;
+        }
+    }
+    
+    private AggregatorEntry retrieveEntry(final String seriesUUID)
+    {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Retrieving entry for " + seriesUUID) ;
+        }
+        while(true)
+        {
+            AggregatorEntry entry ;
+            synchronized(head)
+            {
+                entry = seriesUUIDtoEntry.get(seriesUUID) ;
+                if (entry == null)
+                {
+                    entry = new AggregatorEntry(seriesUUID) ;
+                    seriesUUIDtoEntry.put(seriesUUID, entry) ;
+                    final AggregatorEntry previousEntry = head.getPreviousEntry() ;
+                    entry.setNextEntry(head) ;
+                    head.setPreviousEntry(entry) ;
+                    entry.setPreviousEntry(previousEntry) ;
+                    previousEntry.setNextEntry(entry) ;
+                }
+            }
+            if (!entry.hasExpired())
+            {
+                entry.lock() ;
+                return entry ;
+            }
+        }
+    }
+
+    private void releaseEntry(final AggregatorEntry entry)
+    {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Releasing entry for " + entry.getSeriesUUID()) ;
+        }
+        if (entry.getMessage() == null)
+        {
+            removeEntry(entry) ;
+        }
+        entry.unlock() ;
+    }
+
+    private void removeEntry(final AggregatorEntry entry)
+    {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Removing entry for " + entry.getSeriesUUID()) ;
+        }
+        synchronized(head)
+        {
+            if (!entry.hasExpired())
+            {
+                seriesUUIDtoEntry.remove(entry.getSeriesUUID()) ;
+                final AggregatorEntry nextEntry = entry.getNextEntry() ;
+                final AggregatorEntry previousEntry = entry.getPreviousEntry() ;
+                entry.setNextEntry(null) ;
+                entry.setPreviousEntry(null) ;
+                nextEntry.setPreviousEntry(previousEntry) ;
+                previousEntry.setNextEntry(nextEntry) ;
+            }
+        }
+    }
+    
+    protected abstract boolean aggregateMessage(final Message aggregatedMessage, final Integer messageSequence, final Message currentMessage)
+        throws ActionProcessingException ;
+    
+    private final class AggregatorEntry
+    {
+        private final long expiry ;
+        private final String seriesUUID ;
+        private int sequenceCount ;
+        private int aggregateCount ;
+        private Message message ;
+
+        private Lock lock = new ReentrantLock() ;
+        private AggregatorEntry previousEntry ;
+        private AggregatorEntry nextEntry ;
+        
+        AggregatorEntry(final String seriesUUID)
+        {
+            expiry = System.currentTimeMillis() + timeout ;
+            this.seriesUUID = seriesUUID ;
+        }
+
+        long getExpiry()
+        {
+            return expiry ;
+        }
+        
+        String getSeriesUUID()
+        {
+            return seriesUUID ;
+        }
+        
+        void setSequenceCount(final int sequenceCount)
+        {
+            this.sequenceCount = sequenceCount ;
+        }
+        
+        int getSequenceCount()
+        {
+            return sequenceCount ;
+        }
+        
+        void setMessage(final Message message)
+        {
+            this.message = message ;
+        }
+        
+        Message getMessage()
+        {
+            return message ;
+        }
+        
+        int incAggregateCount()
+        {
+            return ++aggregateCount ;
+        }
+        
+        void lock()
+        {
+            lock.lock() ;
+        }
+        
+        void unlock()
+        {
+            lock.unlock() ;
+        }
+        
+        void setPreviousEntry(final AggregatorEntry previousEntry)
+        {
+            this.previousEntry = previousEntry;
+        }
+        
+        AggregatorEntry getPreviousEntry()
+        {
+            return previousEntry ;
+        }
+        
+        void setNextEntry(final AggregatorEntry nextEntry)
+        {
+            this.nextEntry = nextEntry ;
+        }
+        
+        AggregatorEntry getNextEntry()
+        {
+            return nextEntry ;
+        }
+        
+        boolean hasExpired()
+        {
+            return nextEntry == null ;
+        }
+    }
+    
+    private final class TimeoutProcessor implements Runnable
+    {
+        private Thread thread ;
+        private volatile boolean active ;
+        
+        @Override
+        public void run()
+        {
+            LOGGER.debug("Starting timeout processor") ;
+            synchronized(head)
+            {
+                while(active)
+                {
+                    final long now = System.currentTimeMillis() ;
+                    
+                    final AggregatorEntry firstEntry = head.getNextEntry()  ;
+                    try
+                    {
+                        if (firstEntry == head)
+                        {
+                            if (LOGGER.isDebugEnabled())
+                            {
+                                LOGGER.debug("Empty aggregator list, waiting for " + timeout) ;
+                            }
+                            head.wait(timeout) ;
+                        }
+                        else if (firstEntry.getExpiry() > now)
+                        {
+                            final long waitPeriod = firstEntry.getExpiry() - now ;
+                            if (LOGGER.isDebugEnabled())
+                            {
+                                LOGGER.debug("First entry not expired, waiting for " + waitPeriod) ;
+                            }
+                            head.wait(waitPeriod) ;
+                        }
+                        else
+                        {
+                            if (LOGGER.isDebugEnabled())
+                            {
+                                LOGGER.debug("Timing out entry for " + firstEntry.getSeriesUUID()) ;
+                            }
+                            removeEntry(firstEntry) ;
+                        }
+                    }
+                    catch (final InterruptedException ie) {} // ignore
+                }
+            }
+            LOGGER.debug("Stopping timeout processor") ;
+        }
+        
+        public void start()
+        {
+            if (thread == null)
+            {
+                thread = new Thread(this) ;
+                active = true ;
+                thread.start();
+            }
+        }
+        
+        public void stop()
+        {
+            if (thread != null)
+            {
+                active = false ;
+                synchronized(head)
+                {
+                    head.notify() ;
+                }
+                try
+                {
+                    thread.join() ;
+                }
+                catch (final InterruptedException ie)
+                {
+                    LOGGER.warn("Unexpected interruption while waiting for timeout processor to stop") ;
+                }
+                thread = null ;
+            }
+        }
+    }
+}
\ No newline at end of file

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AggregateDetails.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AggregateDetails.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/AggregateDetails.java	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2012,
+ * @author JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregator;
+
+import java.io.Serializable ;
+
+/**
+ * Aggregate details.
+ * 
+ * @author Kevin Conner
+ */
+public class AggregateDetails implements Serializable
+{
+    private static final long serialVersionUID = -7718553041580150082L ;
+    
+    public static final String AGGREGATE_DETAILS = "Aggregate.AggregateDetails" ;
+    public static final String SERIES_UUID = "Aggregate.SeriesUUID" ;
+    public static final String SEQUENCE_COUNT = "Aggregate.SequenceCount" ;
+    
+    private final String seriesUUID ;
+    private final Integer messageSequence ;
+    private Integer sequenceCount ;
+    
+    public AggregateDetails(final String seriesUUID, final Integer messageSequence)
+    {
+        this.seriesUUID = seriesUUID ;
+        this.messageSequence = messageSequence ;
+    }
+    
+    public String getSeriesUUID()
+    {
+        return seriesUUID ;
+    }
+    
+    public Integer getMessageSequence()
+    {
+        return messageSequence ;
+    }
+    
+    public void setSequenceCount(final Integer sequenceCount)
+    {
+        this.sequenceCount = sequenceCount ;
+    }
+    
+    public Integer getSequenceCount()
+    {
+        return sequenceCount ;
+    }
+}
\ No newline at end of file

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/StreamingAggregator.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/StreamingAggregator.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/aggregator/StreamingAggregator.java	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2012,
+ * @author JBoss Inc.
+ */
+package org.jboss.soa.esb.actions.aggregator;
+
+import java.util.TreeMap;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Body;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * Concrete implementation of the AbstractAggregator class which 
+ * stores te messages.
+ * 
+ * @author Kevin Conner
+ */
+public class StreamingAggregator extends AbstractAggregator
+{
+    public StreamingAggregator(final ConfigTree config)
+        throws ConfigurationException
+    {
+        super(config) ;
+    }
+
+    @Override
+    protected boolean aggregateMessage(final Message aggregatedMessage, final Integer messageSequence, final Message currentMessage)
+        throws ActionProcessingException
+    {
+        final Body body = aggregatedMessage.getBody() ;
+        TreeMap<Integer, Object> aggregatedMessages = (TreeMap<Integer, Object>) body.get() ;
+        if (aggregatedMessages == null)
+        {
+            aggregatedMessages = new TreeMap<Integer, Object>() ;
+            body.add(aggregatedMessages) ;
+        }
+        final Object original = aggregatedMessages.put(messageSequence, currentMessage.getBody().get())  ;
+        if (original != null)
+        {
+            aggregatedMessages.put(messageSequence, original) ;
+            return false ;
+        }
+        else
+        {
+            return true ;
+        }
+    }
+
+}

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/OtherStreamingAggregatorUnitTest.xml
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/OtherStreamingAggregatorUnitTest.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/OtherStreamingAggregatorUnitTest.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,6 @@
+<testActions>
+	<action class="org.jboss.soa.esb.actions.Aggregator" name="Aggregator"
+		service-category="test" service-name="Aggregator" />
+	<action class="org.jboss.soa.esb.actions.Aggregator" name="Aggregator"
+		service-category="test" service-name="Aggregator"/>
+</testActions>

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregatorUnitTest.java	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+
+import org.jboss.soa.esb.actions.aggregator.AbstractAggregator;	
+import org.jboss.soa.esb.actions.aggregator.StreamingAggregator;
+import org.jboss.soa.esb.actions.aggregator.AggregateDetails;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.message.format.MessageType;
+import org.jboss.soa.esb.testutils.FileUtil;
+import org.jboss.soa.esb.util.ClassUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the StreamingAggregator class
+ *
+ * @author <a href="mailto:tcunning at jboss.com">Tom Cunningham</a>
+ */
+
+
+public class StreamingAggregatorUnitTest
+{
+	static Logger logger = Logger.getLogger(StreamingAggregatorUnitTest.class);
+    static Message message = null;
+    
+    private static EPR epr1;
+    private static EPR epr2;
+    private static EPR epr3;
+    private static MockCourier courier1;
+    private static MockCourier courier2;
+    private static MockCourier courier3;
+    
+    private static ConfigTree[] actions;
+	
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(StreamingAggregatorUnitTest.class);
+    }
+    
+    @BeforeClass
+    public static void before() throws Exception {
+        MockCourierFactory.install();
+        MockRegistry.install();
+
+        epr1 = new EPR(new URI("test1"));
+        epr2 = new EPR(new URI("test2"));
+        epr3 = new EPR(new URI("DLS"));
+        courier1 = new MockCourier(true);
+        courier2 = new MockCourier(true);
+        courier3 = new MockCourier(true);
+
+        MockRegistry.register("test", "java", epr1, courier1);
+        MockRegistry.register("test", "xml", epr2, courier2);
+        MockRegistry.register("test", "aggregator", epr3, courier3);
+ 
+        message = MessageFactory.getInstance().getMessage(MessageType.JAVA_SERIALIZED);
+        message.getBody().add(("Hello Aggregator"));
+        
+        InputStream in = ClassUtil.getResourceAsStream("StreamingAggregator_On_Properties_UnitTest.xml", StreamingAggregatorUnitTest.class);
+        String xml = FileUtil.readStream(in);
+        actions = ConfigTree.fromXml(xml).getChildren("action");
+    }
+    
+    @Test
+	public void aggregateThreeMessages()
+    {
+		try {
+            StreamingAggregator aggregator = new StreamingAggregator(actions[0]);
+            aggregator.initialise();
+            int recipientCount=3;
+            String uuId = UUID.randomUUID().toString();
+            long timestamp = System.currentTimeMillis();
+            ArrayList<String> aggregatorTags = new ArrayList<String>();
+            
+	            for(int i = 0; i < recipientCount; i++) 
+            {
+                String tag = uuId + ":" + (i + 1) + ":" + recipientCount + ":" + timestamp;
+                aggregatorTags.add(tag);
+                message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
+                		new AggregateDetails(uuId, i));
+                ((AggregateDetails)message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS)).setSequenceCount(Integer.valueOf(recipientCount)) ;
+
+                
+                Message responseMessage = aggregator.process(message);
+                if (i<recipientCount-1) {
+                    assertNull(responseMessage);
+                } else {
+                    assertNotNull(responseMessage);
+                }
+            }
+            
+		} catch (Exception e) {
+			e.printStackTrace();
+			assertTrue(false);
+		}
+    }
+    
+    @Test
+	public void aggregateWithoutTimeout()
+    {
+		try {
+			InputStream in = ClassUtil.getResourceAsStream("OtherStreamingAggregatorUnitTest.xml", StreamingAggregatorUnitTest.class);
+	        String xml = FileUtil.readStream(in);
+	        ConfigTree[] acts = ConfigTree.fromXml(xml).getChildren("action");
+	        
+            StreamingAggregator aggregator = new StreamingAggregator(acts[0]);
+            aggregator.initialise();
+
+		} catch (Exception e) {
+			assertTrue(false);
+		}
+    }
+    
+    @Test	
+    public void aggregateTimeoutTest1()
+    {
+        try {
+            StreamingAggregator aggregator = new StreamingAggregator(actions[1]);
+            aggregator.initialise();
+
+            int recipientCount=3;
+            String uuId = UUID.randomUUID().toString();
+            long timestamp = System.currentTimeMillis();
+            ArrayList<String> aggregatorTags = new ArrayList<String>();
+            
+            for(int i = 0; i < recipientCount; i++) 
+            {
+                String tag = uuId + ":" + (i + 1) + ":" + recipientCount + ":" + timestamp;
+                aggregatorTags.add(tag);
+                message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, 
+                		new AggregateDetails(uuId, i));
+                
+                ((AggregateDetails)message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS)).setSequenceCount(Integer.valueOf(recipientCount)) ;
+                
+                try {
+                    Thread.sleep(200);
+                } catch (InterruptedException e) {}
+                
+                Message responseMessage = aggregator.process(message);
+                
+                //all message should be expired
+                assertNull(responseMessage);
+                
+            }
+            
+        } catch (Exception e) {
+            assertTrue(false);
+        }
+    }
+    
+}
\ No newline at end of file

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregator_On_Properties_UnitTest.xml
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregator_On_Properties_UnitTest.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/StreamingAggregator_On_Properties_UnitTest.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,6 @@
+<testActions>
+	<action class="org.jboss.soa.esb.actions.aggregator.StreamingAggregator" name="Aggregator" timeoutInMillis="60000"
+		service-category="test" service-name="Aggregator"/>
+	<action class="org.jboss.soa.esb.actions.aggregator.StreamingAggregator" name="Aggregator" timeoutInMillis="100"
+		service-category="test" service-name="Aggregator" />
+</testActions>
\ No newline at end of file

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/build.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/build.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/build.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,50 @@
+<project name="Quickstart_streaming_aggregator" default="run" basedir=".">
+	
+	<description> 
+		${ant.project.name}
+		${line.separator}
+	</description>
+	
+	<!-- Import the base Ant build script... -->
+	<import file="../conf/base-build.xml"/>
+
+        <property name="jbossesb.name" value="jboss-esb.xml"/>
+        <property name="jbossesb.rootdir" location="${build.dir}/dirs"/>
+        <property name="jbossesb.inputdir" location="${jbossesb.rootdir}/input"/>
+        <property name="jbossesb.outputdir" location="${jbossesb.rootdir}/output"/>
+        <property name="jbossesb.errordir" location="${jbossesb.rootdir}/error"/>
+
+        <target name="filter_jboss-esb.xml">
+            <copy file="jboss-esb-unfiltered.xml" tofile="${basedir}/${jbossesb.name}" overwrite="true" filtering="true">
+                <filterset>
+                    <filter token="INPUTDIR" value="${jbossesb.inputdir}"/>
+                    <filter token="OUTPUTDIR" value="${jbossesb.outputdir}"/>
+                    <filter token="ERRORDIR" value="${jbossesb.errordir}"/>
+                </filterset>
+            </copy>
+        </target>
+
+        <target name="config">
+            <delete dir="${jbossesb.rootdir}" quiet="true"/>
+            <mkdir dir="${jbossesb.rootdir}"/>
+            <mkdir dir="${jbossesb.inputdir}"/>
+            <mkdir dir="${jbossesb.outputdir}"/>
+            <mkdir dir="${jbossesb.errordir}"/>
+
+            <antcall target="filter_jboss-esb.xml"/>
+            <copy file="log4j.xml" tofile="build/log4j.xml"/>
+        </target>
+
+        <target name="runtest" depends="compile"
+                description="will create a testfile which in turn will trigger the ESB">
+                <echo>Runs Test File Creator</echo>
+                <java fork="yes" classname="org.jboss.soa.esb.samples.quickstart.sampleaggregator.test.CreateTestFile" failonerror="true">
+                        <sysproperty key="log4j.configuration" value="${log4j.xml}"/>
+                        <arg value="${jbossesb.inputdir}"/>
+                        <arg value="MyInput.dat"/> <!-- the directory the file should be created in -->
+                        <arg value="100"/> <!-- lines of content to create -->
+                        <classpath refid="exec-classpath"/>
+                </java>
+        </target>
+
+</project>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/deployment.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/deployment.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/deployment.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,5 @@
+<?xml version="1.0"?>
+<jbossesb-deployment>
+  <jmsQueue>quickstart_streaming_aggregator_Request_esb</jmsQueue>
+  <jmsQueue>quickstart_streaming_aggregator_Request_gw</jmsQueue>
+</jbossesb-deployment>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/hornetq-jms.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/hornetq-jms.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/hornetq-jms.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,9 @@
+<?xml version="1.0"?>
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+  <queue name="quickstart_streaming_aggregator_Request_esb">
+    <entry name="queue/quickstart_streaming_aggregator_Request_esb"/>
+  </queue>
+  <queue name="quickstart_streaming_aggregator_Request_gw">
+    <entry name="queue/quickstart_streaming_aggregator_Request_gw"/>
+  </queue>
+</configuration>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbm-queue-service.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbm-queue-service.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbm-queue-service.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<server>
+  <mbean code="org.jboss.jms.server.destination.QueueService"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_Request_esb"
+    xmbean-dd="xmdesc/Queue-xmbean.xml">
+	<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+	<depends>jboss.messaging:service=PostOffice</depends>
+  </mbean>
+  <mbean code="org.jboss.jms.server.destination.QueueService"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_Request_gw"
+    xmbean-dd="xmdesc/Queue-xmbean.xml">
+    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+	<depends>jboss.messaging:service=PostOffice</depends>
+  </mbean>
+</server>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbmq-queue-service.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbmq-queue-service.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbmq-queue-service.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<server>
+  <mbean code="org.jboss.mq.server.jmx.Queue"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_Request_esb">
+    <depends optional-attribute-name="DestinationManager">
+      jboss.mq:service=DestinationManager
+    </depends>
+  </mbean>
+  <mbean code="org.jboss.mq.server.jmx.Queue"
+    name="jboss.esb.quickstart.destination:service=Queue,name=quickstart_streaming_aggregator_Request_gw">
+    <depends optional-attribute-name="DestinationManager">
+      jboss.mq:service=DestinationManager
+    </depends>
+  </mbean>
+</server>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb-unfiltered.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb-unfiltered.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb-unfiltered.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,69 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd http://anonsvn.jboss.org/repos/labs/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
+    parameterReloadSecs="5">
+
+    <providers> 
+
+          <fs-provider name="AggFileProvider">
+              <fs-bus busid="AggFileChannel">
+              <fs-message-filter
+                  directory="@INPUTDIR@"
+                  error-delete="false"
+                  error-directory="@ERRORDIR@"
+                  error-suffix=".IN_ERROR" input-suffix=".dat" post-delete="false"
+                  post-directory="@OUTPUTDIR@"
+                  post-suffix=".sentToEsb" work-suffix=".esbWorking"/>
+              </fs-bus>
+          </fs-provider>
+
+          <jms-provider name="JBossMQ" connection-factory="ConnectionFactory">
+              <jms-bus busid="quickstartGwChannel">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/quickstart_streaming_aggregator_Request_gw"
+                   />
+              </jms-bus>
+              <jms-bus busid="quickstartEsbChannel">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/quickstart_streaming_aggregator_Request_esb"
+
+                  />
+              </jms-bus>
+
+          </jms-provider>
+      </providers>
+      
+      <services>
+        <service 
+        	category="FirstServiceESB" 
+        	name="SimpleListener" 
+        	description="Streaming Aggregator">
+            <listeners>
+	        <fs-listener busidref="AggFileChannel" is-gateway="true"
+                    name="StreamingFileListener" schedule-frequency="5"
+                    maxThreads="10">
+                    <property name="composer-class" value="org.jboss.soa.esb.samples.quickstart.sampleaggregator.IncomingComposer"/>
+                </fs-listener>
+                <jms-listener name="streamingAgg"
+                              busidref="quickstartEsbChannel"
+                />
+            </listeners>
+            <actions mep="OneWay">
+                    <!-- The next action is for Continuous Integration testing -->
+	            <action class="org.jboss.soa.esb.actions.aggregator.StreamingAggregator" name="aggregateStreamedMessages">
+                        <property name="timeoutInMillis" value="1000000"/>
+                    </action>
+
+                    <action name="dump" class="org.jboss.soa.esb.actions.SystemPrintln">
+                        <property name="printfull" value="true"/>
+                    </action>
+
+                    <action name="testStore" class="org.jboss.soa.esb.actions.TestMessageStore"/>
+            </actions>
+        </service>
+      </services>
+     
+</jbossesb>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jboss-esb.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,69 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd http://anonsvn.jboss.org/repos/labs/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
+    parameterReloadSecs="5">
+
+    <providers> 
+
+          <fs-provider name="AggFileProvider">
+              <fs-bus busid="AggFileChannel">
+              <fs-message-filter
+                  directory="/home/tcunning/src/jbossesb/trunkthree/product/samples/quickstarts/streaming_aggregator/build/dirs/input"
+                  error-delete="false"
+                  error-directory="/home/tcunning/src/jbossesb/trunkthree/product/samples/quickstarts/streaming_aggregator/build/dirs/error"
+                  error-suffix=".IN_ERROR" input-suffix=".dat" post-delete="false"
+                  post-directory="/home/tcunning/src/jbossesb/trunkthree/product/samples/quickstarts/streaming_aggregator/build/dirs/output"
+                  post-suffix=".sentToEsb" work-suffix=".esbWorking"/>
+              </fs-bus>
+          </fs-provider>
+
+          <jms-provider name="JBossMQ" connection-factory="ConnectionFactory">
+              <jms-bus busid="quickstartGwChannel">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/quickstart_streaming_aggregator_Request_gw"
+                   />
+              </jms-bus>
+              <jms-bus busid="quickstartEsbChannel">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/quickstart_streaming_aggregator_Request_esb"
+
+                  />
+              </jms-bus>
+
+          </jms-provider>
+      </providers>
+      
+      <services>
+        <service 
+        	category="FirstServiceESB" 
+        	name="SimpleListener" 
+        	description="Streaming Aggregator">
+            <listeners>
+	        <fs-listener busidref="AggFileChannel" is-gateway="true"
+                    name="StreamingFileListener" schedule-frequency="5"
+                    maxThreads="10">
+                    <property name="composer-class" value="org.jboss.soa.esb.samples.quickstart.sampleaggregator.IncomingComposer"/>
+                </fs-listener>
+                <jms-listener name="streamingAgg"
+                              busidref="quickstartEsbChannel"
+                />
+            </listeners>
+            <actions mep="OneWay">
+                    <!-- The next action is for Continuous Integration testing -->
+	            <action class="org.jboss.soa.esb.actions.aggregator.StreamingAggregator" name="aggregateStreamedMessages">
+                        <property name="timeoutInMillis" value="1000000"/>
+                    </action>
+
+                    <action name="dump" class="org.jboss.soa.esb.actions.SystemPrintln">
+                        <property name="printfull" value="true"/>
+                    </action>
+
+                    <action name="testStore" class="org.jboss.soa.esb.actions.TestMessageStore"/>
+            </actions>
+        </service>
+      </services>
+     
+</jbossesb>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbossesb-properties.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbossesb-properties.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jbossesb-properties.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  JBoss, Home of Professional Open Source
+  Copyright 2006, JBoss Inc., and others contributors as indicated 
+  by the @authors tag. All rights reserved. 
+  See the copyright.txt in the distribution for a
+  full listing of individual contributors. 
+  This copyrighted material is made available to anyone wishing to use,
+  modify, copy, or redistribute it subject to the terms and conditions
+  of the GNU Lesser General Public License, v. 2.1.
+  This program is distributed in the hope that it will be useful, but WITHOUT A 
+  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+  PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+  You should have received a copy of the GNU Lesser General Public License,
+  v.2.1 along with this distribution; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+  MA  02110-1301, USA.
+  
+  (C) 2005-2006,
+  @author JBoss Inc.
+-->
+<!-- $Id: jbossesb-unittest-properties.xml $ -->
+<!--
+  These options are described in the JBossESB manual.
+  Defaults are provided here for convenience only.
+ 
+  Please read through this file prior to using the system, and consider
+  updating the specified entries.
+-->
+<esb
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:noNamespaceSchemaLocation="jbossesb-1_0.xsd">
+    <properties name="core">
+		<property name="org.jboss.soa.esb.jndi.server.type" value="jboss"/>
+		<property name="org.jboss.soa.esb.jndi.server.url" value="localhost"/>
+		<property name="org.jboss.soa.esb.persistence.connection.factory" 	value="org.jboss.internal.soa.esb.persistence.format.MessageStoreFactoryImpl"/>
+        <property name="jboss.esb.invm.scope.default" value="NONE"/>
+    </properties>
+    <properties name="registry">      
+    	<property name="org.jboss.soa.esb.registry.queryManagerURI" value="org.apache.juddi.v3.client.transport.wrapper.UDDIInquiryService#inquire"/>
+    	<property name="org.jboss.soa.esb.registry.lifeCycleManagerURI" value="org.apache.juddi.v3.client.transport.wrapper.UDDIPublicationService#publish"/>
+    	<property name="org.jboss.soa.esb.registry.securityManagerURI" value="org.apache.juddi.v3.client.transport.wrapper.UDDISecurityService#secure"/>
+    	<property name="org.jboss.soa.esb.registry.implementationClass" value="org.jboss.internal.soa.esb.services.registry.JAXRRegistryImpl"/>
+    	<property name="org.jboss.soa.esb.registry.factoryClass" value="org.apache.ws.scout.registry.ConnectionFactoryImpl"/>
+    	<property name="org.jboss.soa.esb.registry.user" value="root"/>
+    	<property name="org.jboss.soa.esb.registry.password" value="root"/>
+    	<!-- the following parameter is scout specific to set the type of communication between scout and the UDDI (embedded, rmi, soap) -->
+    	<property name="org.jboss.soa.esb.scout.proxy.transportClass" value="org.apache.ws.scout.transport.LocalTransport"/>
+    	<property name="org.jboss.soa.esb.scout.proxy.uddiVersion" value="3.0"/>
+    	<property name="org.jboss.soa.esb.scout.proxy.uddiNameSpace" value="urn:uddi-org:api_v3"/>
+    	<!-- Organization Category to be used by this deployment. -->
+        <property name="org.jboss.soa.esb.registry.orgCategory" value="org.jboss.soa.esb.:category"/>
+    </properties>
+    <properties name="transports" depends="core">
+    	<property name="org.jboss.soa.esb.mail.smtp.host" value="localhost"/>
+    	<property name="org.jboss.soa.esb.mail.smtp.user" value="jbossesb"/>
+    	<property name="org.jboss.soa.esb.mail.smtp.password" value=""/>
+    	<property name="org.jboss.soa.esb.mail.smtp.port" value="25"/>
+    </properties>
+    <properties name="connection">
+    	<property name="min-pool-size" value="5"/>
+    	<property name="max-pool-size" value="10"/>
+    	<property name="blocking-timeout-millis" value="5000"/>
+    	<property name="abandoned-connection-timeout" value="10000"/>
+    	<property name="abandoned-connection-time-interval" value="30000"/>
+    </properties>
+    <properties name="dbstore">
+		<property name="org.jboss.soa.esb.persistence.db.connection.url" 	value="jdbc:hsqldb:hsql://localhost:9001/"/>
+		<property name="org.jboss.soa.esb.persistence.db.jdbc.driver" 		value="org.hsqldb.jdbcDriver"/>
+		<property name="org.jboss.soa.esb.persistence.db.user" 			value="sa"/>
+		<property name="org.jboss.soa.esb.persistence.db.pwd" 			value=""/>		
+		<property name="org.jboss.soa.esb.persistence.db.pool.initial.size"	value="2"/>
+		<property name="org.jboss.soa.esb.persistence.db.pool.min.size"	value="2"/>
+		<property name="org.jboss.soa.esb.persistence.db.pool.max.size"	value="5"/>
+		<!--table managed by pool to test for valid connections - created by pool automatically -->
+		<property name="org.jboss.soa.esb.persistence.db.pool.test.table"	value="pooltest"/>
+		<!-- # of milliseconds to timeout waiting for a connection from pool -->
+		<property name="org.jboss.soa.esb.persistence.db.pool.timeout.millis"	value="5000"/> 
+                <property name="org.jboss.soa.esb.persistence.db.conn.manager" value="org.jboss.internal.soa.esb.persistence.manager.StandaloneConnectionManager"/>
+    </properties>
+    <properties name="messagerouting">
+    	<property name="org.jboss.soa.esb.routing.cbrClass" value="org.jboss.internal.soa.esb.services.routing.cbr.JBossRulesRouter"/>
+    </properties>
+</esb>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jndi.properties
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jndi.properties	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/jndi.properties	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,5 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming
+java.naming.factory.url.pkgs=org.jnp.interfaces
+

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/log4j.xml
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/log4j.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/log4j.xml	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- ===================================================================== -->
+<!--                                                                       -->
+<!--  Log4j Configuration                                                  -->
+<!--                                                                       -->
+<!-- ===================================================================== -->
+
+<!-- $Id: log4j.xml,v 1.26.2.5 2005/09/15 09:31:02 dimitris Exp $ -->
+
+<!--
+   | For more configuration infromation and examples see the Jakarta Log4j
+   | owebsite: http://jakarta.apache.org/log4j
+ -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+   <!-- ============================== -->
+   <!-- Append messages to the console -->
+   <!-- ============================== -->
+
+   <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+      <errorHandler class="org.apache.log4j.helpers.OnlyOnceErrorHandler"/>
+      <param name="Target" value="System.out"/>
+      <param name="Threshold" value="INFO"/>
+
+      <layout class="org.apache.log4j.PatternLayout">
+         <!-- The default pattern: Date Priority [Category] Message\n -->
+         <param name="ConversionPattern" value="%d{ABSOLUTE} %-5p [%t][%c{1}] %m%n"/>
+      </layout>
+   </appender>
+
+   <!-- ================================= -->
+   <!-- Preserve messages in a local file -->
+   <!-- ================================= -->
+
+   <!-- A size based file rolling appender -->
+   <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+     <errorHandler class="org.apache.log4j.helpers.OnlyOnceErrorHandler"/>
+     <param name="File" value="./listener.log"/>
+     <param name="Append" value="false"/>
+     <param name="MaxFileSize" value="500KB"/>
+     <param name="MaxBackupIndex" value="1"/>
+
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d %-5p [%t][%c] %m%n"/>
+     </layout>	    
+   </appender>
+
+   <!-- ================ -->
+   <!-- Limit categories -->
+   <!-- ================ -->
+
+   <category name="org.jboss">
+      <priority value="WARN"/>
+   </category>
+   <category name="org.jboss.soa.esb">
+      <priority value="ERROR"/>
+   </category>
+   <category name="org.jboss.internal.soa.esb">
+      <priority value="ERROR"/>
+   </category>
+   <category name="org.apache">
+      <priority value="ERROR"/>
+   </category>
+   <category name="quickstart">
+      <priority value="DEBUG"/>
+   </category>
+   <!-- ======================= -->
+   <!-- Setup the Root category -->
+   <!-- ======================= -->
+
+   <root>
+      <appender-ref ref="CONSOLE"/>
+      <appender-ref ref="FILE"/>
+   </root>
+
+</log4j:configuration>

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/readme.txt
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/readme.txt	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/readme.txt	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,39 @@
+Overview:
+=========
+  The purpose of the streaming_aggregator quickstart sample is to demonstrate
+  the functionality of the StreamingAggregator.     
+
+  The streaming_aggregator quickstart ant "runtest" target creates a file
+  with a number of lines in it (100 lines, you can configure it differently in
+  the build.xml.    This file is picked up by the file gateway, and a custom 
+  composer (IncomingComposer) splits each line in the file into its own Message.
+
+  The Messages are then aggregated.    The IncomingComposer adds aggregation
+  details into the message telling the StreamingAggregator the unique ID
+  of the aggregated message group and the message order within that group.
+ 
+  The benefit of org.jboss.soa.esb.actions.aggregator.StreamingAggregator
+  over the standard JBoss ESB aggregator (org.jboss.soa.esb.actions.Aggregator)
+  is that the standard aggregator requires you to know how many items you
+  will be aggregating at the moment at which you send the first item.    If
+  you are reading in a very large file to split, you would need to read in the
+  whole file before starting to split, or you would have to run through the
+  file preemptively to count the number of items you would be splitting.
+ 
+  The StreamingAggregator on the other hand allows you to start sending
+  immediately.    Once you are finished splitting, you can then send a message
+  that will inform aggregator what the final count was so that it can check
+  whether it has received all of the split messages. 
+
+Running this quickstart:
+========================
+  Please refer to 'ant help-quickstarts' for prerequisites about the quickstarts
+  and a more detailed descripton of the different ways to run the quickstarts.
+
+To Run:
+===========================
+  1. In a command terminal window in this folder ("Window1"), type 'ant deploy'.
+  2. Open another command terminal window in this folder ("Window2"), type
+     'ant runtest'.
+  3. Switch back to Application Server console to see the output from the ESB
+  4. In this folder ("Window1"), type 'ant undeploy'.

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/IncomingComposer.java
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/IncomingComposer.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/IncomingComposer.java	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2012,
+ * @author JBoss Inc.
+ */
+package org.jboss.soa.esb.samples.quickstart.sampleaggregator;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+
+import org.jboss.soa.esb.actions.aggregator.AggregateDetails;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageComposer;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+public class IncomingComposer implements MessageComposer<File>
+{
+    private static final Logger LOGGER = Logger.getLogger(IncomingComposer.class) ;
+
+    private Service service ;
+    private ServiceInvoker serviceInvoker ;
+
+    @Override
+    public void setConfiguration(final ConfigTree config)
+        throws ConfigurationException
+    {
+        service = Service.getGatewayTargetService(config) ;
+        try
+        {
+            serviceInvoker = new ServiceInvoker(service) ;
+        }
+        catch (final MessageDeliverException mde)
+        {
+            throw new ConfigurationException("Unexpected exception while creating service invoker", mde.getCause()) ;
+        }
+    }
+
+    @Override
+    public Message compose(final File file)
+        throws MessageDeliverException
+    {
+        final MessageFactory messageFactory = MessageFactory.getInstance() ;
+        Message message = null ;
+
+        int currentRecord = 0 ;
+        final String seriesUUID = UUID.randomUUID().toString() ;
+        LOGGER.info("Commence composition of series " + seriesUUID) ;
+
+        try
+        {
+            final FileReader fr = new FileReader(file) ;
+            try
+            {
+                final BufferedReader br = new BufferedReader(fr) ;
+                do
+                {
+                    final String line = br.readLine() ;
+                    if (line == null)
+                    {
+                        break ;
+                    }
+                    if (message != null)
+                    {
+                        serviceInvoker.deliverAsync(message) ;
+                    }
+                    message = messageFactory.getMessage() ;
+                    message.getBody().add(line) ;
+                    message.getProperties().setProperty(AggregateDetails.AGGREGATE_DETAILS, new AggregateDetails(seriesUUID, Integer.valueOf(currentRecord++))) ;
+                }
+                while(true) ;
+                if (message != null)
+                {
+                    ((AggregateDetails)message.getProperties().getProperty(AggregateDetails.AGGREGATE_DETAILS)).setSequenceCount(Integer.valueOf(currentRecord)) ;
+                }
+            }
+            finally
+            {
+                try
+                {
+                    fr.close() ;
+                }
+                catch (final IOException ioe) {} // ignore on close
+                LOGGER.info("Finishing composition of series " + seriesUUID) ;
+            }
+        }
+        catch (final IOException ioe)
+        {
+            throw new MessageDeliverException("Unexpected IO exception while processing " + file.getAbsolutePath(), ioe) ;
+        }
+        return message ;
+    }
+
+    @Override
+    public Object decompose(final Message message, final File originalInputMessagePayload) throws MessageDeliverException
+    {
+        throw new MessageDeliverException("Not supported") ;
+    }
+}

Added: labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/test/CreateTestFile.java
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/test/CreateTestFile.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/samples/quickstarts/streaming_aggregator/src/org/jboss/soa/esb/samples/quickstart/sampleaggregator/test/CreateTestFile.java	2012-03-14 19:25:08 UTC (rev 37921)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2012,
+ * @author JBoss Inc.
+ */
+package org.jboss.soa.esb.samples.quickstart.sampleaggregator.test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+public class CreateTestFile {
+    
+    public static void main(String args[]) throws Exception
+    {        	    	
+    	String inputDirectory = args[0];
+    	String fileName = args[1];
+    	int lines = Integer.parseInt(args[2]);
+    	File x = new File(inputDirectory + "/" + fileName);
+    	try {
+    		BufferedWriter out = new BufferedWriter(new FileWriter(x));
+    		for (int i = 1; i <= lines; i++) {
+    			out.write( (i + " line of file").toCharArray() );
+    			out.newLine();
+    		}
+    		out.close();
+    	} catch (Exception e) {
+    		System.out.println("Error while writing the file: " + inputDirectory + "/" + fileName);
+    		System.out.println(e.getMessage());
+    	}
+    }
+    
+}
\ No newline at end of file



More information about the jboss-svn-commits mailing list