[jboss-svn-commits] JBL Code SVN: r9183 - in labs/jbossesb/trunk: product/core/services/src/org/jboss/soa/esb/services/routing and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Jan 31 03:28:04 EST 2007


Author: kurt.stam at jboss.com
Date: 2007-01-31 03:28:04 -0500 (Wed, 31 Jan 2007)
New Revision: 9183

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
   labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
   labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
Log:
Adding Splitter/Aggregator actions

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java	2007-01-31 08:28:04 UTC (rev 9183)
@@ -0,0 +1,245 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+/**
+ * Routes the Message argument to a fixed list of services ([category,name]) 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+package org.jboss.soa.esb.actions;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.services.routing.MessageRouter;
+import org.jboss.soa.esb.services.routing.MessageRouterException;
+
+/**
+ * Simple Aggregator. The aggregator relies on 'aggregatorTags'. To puzzle the individual
+ * back together. The aggregatorTag is set in the MessageRouter.deliver() method. The aggregator
+ * adds a collected message in the series as attachements to a new message. When all
+ * messages are received or if we are timeout an aggregated message is returned. 
+ * In all other cases null is returned.
+ * 
+ * Future enhancement should be 
+ *  - all sort of waitfor/timout algorithms.
+ *  - persisting the map so no messages get lost.
+ *  - make the map managable.
+ * 
+ * @author kurt.stam at redhat.com
+ *
+ */
+public class Aggregator
+{
+	public static ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
+        = new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
+    private static TimeoutChecker _timeoutChecker=null;
+    private static ArrayList<String> _notified = new ArrayList<String>();
+    
+    protected ConfigTree config;
+    private Logger logger = Logger.getLogger(Aggregator.class);
+    private Long timeoutInMillies=null;
+    private String serviceName;
+    private String serviceCategoryName;
+    private Registry registry;
+    
+	private Aggregator(){}
+
+	public Aggregator(ConfigTree config) throws ConfigurationException, RegistryException
+	{
+        if (_timeoutChecker==null) {
+        // On startup we should re-populate the _aggregatedMessageMap from permanent storage
+            _timeoutChecker = new TimeoutChecker();
+            _timeoutChecker.start();
+        }
+        
+        this.config = config;
+		checkMyParms();
+        registry = RegistryFactory.getRegistry();
+    }
+    /**
+     * Processes an incoming message, aggregates messages in a set and returns a aggregated message
+     * when isComplete() is satisfied. The aggregated messages are set as attachments to this message.
+     * Next the message can be send to a transformer to do the second part of the aggregation which is 
+     * to convert the attachement messages into to one message.
+     * 
+     * @param message
+     * @return a aggregated message, or null if the aggregation has not been completed.
+     * @throws MessageRouterException
+     */
+	public Message process(Message message) throws MessageRouterException
+	{
+		String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+        if (aggregatorTag!=null) {
+            String[] tag = aggregatorTag.split(":");
+            String uuId = tag[0];
+            String messageNumber = tag[1];
+            int totalNumberOfMessages = Integer.valueOf(tag[2]).intValue();
+             
+            if (isTimedOut(message)) {
+                if (_aggregatedMessageMap.containsKey(uuId)) {
+                    ConcurrentHashMap<String, Message> messageMap = _aggregatedMessageMap.get(uuId);
+                    //add the message in if we don't already have it
+                    if (!messageMap.containsKey(uuId)) {
+                         messageMap.put(messageNumber, message);
+                    }
+                    //Just going to send out what we have this far, which will remove this key
+                    //so subsequent messages with this uuId will be ignored
+                    message = createAggregateMessage(uuId, _aggregatedMessageMap.get(uuId));
+                } else {
+                    logger.debug("Ignoring this message since we are already timedout on this message.");
+                    //ignoring this message
+                    message = null;
+                }
+            } else {
+                ConcurrentHashMap<String, Message> messageMap=null;
+                //Get the messageMap with this uuId, or create it if not found
+                if (_aggregatedMessageMap.containsKey(uuId)) {
+                    messageMap = _aggregatedMessageMap.get(uuId);
+                } else {
+                    messageMap = new ConcurrentHashMap<String, Message>();
+                }
+                if (messageMap.containsKey(messageNumber)) {
+                    logger.warn("Received duplicate message, ignoring it but this should not happen.");
+                } else {
+                    messageMap.put(messageNumber, message);
+                    _aggregatedMessageMap.put(uuId, messageMap);
+                } 
+                if (messageMap.size()==totalNumberOfMessages) {
+                    message = createAggregateMessage(uuId, messageMap);
+                } else {
+                    message = null;
+                }
+            }
+        } else {
+            throw new MessageRouterException("Could not find an aggregator tag, so this message can not be aggregated.");
+        }
+		return message;
+	}
+	/**
+     * Obtains the timeoutInMillies to a number
+     * 
+     * @throws ConfigurationException
+	 */
+	private void checkMyParms() throws ConfigurationException 
+    {
+        timeoutInMillies     = Long.valueOf(config.getAttribute("timeoutInMillies"));
+        logger.debug("Aggregator config:  timeoutInMillies=" + timeoutInMillies);
+        serviceName          = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+        serviceCategoryName  = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+    }
+    /**
+     * Aggregates the messages into 1 new message with an attachment for each message.
+     * 
+     * @param uuId
+     * @param messageMap
+     * @return the aggregated message
+     */
+    private Message createAggregateMessage(String uuId, ConcurrentHashMap<String, Message> messageMap) 
+    {
+        //Create an aggregated message
+        Message aggregatedMessage = MessageFactory.getInstance().getMessage();
+        for (Message oneMessage : messageMap.values()) {
+            //Add the individual messages as attachments
+                aggregatedMessage.getAttachment().addItem(oneMessage);
+        }
+        _aggregatedMessageMap.remove(uuId);
+        //TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
+        //remove from the notificationMap if it is in there.
+        _notified.remove(uuId);
+        
+        return aggregatedMessage;
+    }
+    /**
+     * If the aggregation process is complete then return true. This depends on the configuration. 
+     * 
+     * @param totalNumberOfMessages
+     * @param splitterTimeStamp
+     * @param messageMap
+     * @return
+     */
+    private boolean isTimedOut(Message message) 
+    {
+        long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP);
+        if (timeoutInMillies!=null) {
+            long now = new java.util.Date().getTime();
+            if (splitterTimeStamp + Long.valueOf(timeoutInMillies) > now) {
+                return true;
+            }
+        }
+        return false;
+    }
+    /**
+     * Checks for message that are timed out. If we find that one we notify ourselves about it by resending the message.
+     * 
+     * @author kstam
+     *
+     */
+    class TimeoutChecker extends Thread {
+        public void run() {
+            while(true) {
+                for (ConcurrentHashMap< String, Message > messageMap : _aggregatedMessageMap.values()) {
+                    //Check the first message, they all have the same time stamp
+                    Message message = messageMap.values().iterator().next();
+                    if (isTimedOut(message)) {
+                        //We found a timed-out message. Let's go notify ourselves about by resending a message,
+                        //it if we haven't done so already
+                        String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+                        String[] tag = aggregatorTag.split(":");
+                        String uuId = tag[0];
+                        if (!_notified.contains(uuId)) {
+                            _notified.add(uuId);
+                            logger.debug("Found timeout message.");
+                            try {
+                                EPR epr = registry.findEPR(serviceCategoryName, serviceName);
+                                Courier courier = CourierFactory.getCourier(epr);
+                                courier.deliver(message);
+                            } catch (Exception e) {
+                                logger.error(e.getMessage(), e);
+                                //If we can't notify then drop this data
+                                logger.debug("Deleting data for message series with uuId=" + uuId);
+                                _notified.remove(uuId);
+                                _aggregatedMessageMap.remove(uuId);
+                            }
+                        }
+                    }
+                }
+                try {
+                    Thread.sleep(500);
+                } catch (Exception e){}
+            }
+        }
+    }
+ 
+}


Property changes on: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java	2007-01-31 08:26:53 UTC (rev 9182)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java	2007-01-31 08:28:04 UTC (rev 9183)
@@ -32,15 +32,12 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.Call;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.Registry;
 import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.registry.RegistryFactory;
 import org.jboss.soa.esb.services.routing.MessageRouter;
 import org.jboss.soa.esb.services.routing.MessageRouterException;
 
@@ -56,16 +53,15 @@
 	{
 		_config = config;
 		checkMyParms();
-		_registry = RegistryFactory.getRegistry();
 	} // ________________________________
 
 	public Message process(Message message) throws MalformedEPRException, RegistryException, CourierException
 	{
 		try
 		{
-			Call call = message.getHeader().getCall();
-			if (null == call || null == call.getMessageID())
-				throw new IllegalArgumentException("Null message ID");
+		//	Call call = message.getHeader().getCall();
+		//	if (null == call || null == call.getMessageID())
+		//		throw new IllegalArgumentException("Null message ID");
 	
 			MessageRouter.deliverMessages(_destinations, message);
 			return message;
@@ -93,7 +89,7 @@
 				String name = curr.getRequiredAttribute(
 						ListenerTagNames.SERVICE_NAME_TAG);
 				_destinations.add(new String[]
-				{ category, name });
+				{ category, name});
 			}
 			catch (Exception e)
 			{
@@ -106,8 +102,6 @@
 
 	protected List<String[]> _destinations;
 
-	protected Registry _registry;
-
 	protected static Logger _logger = Logger.getLogger(StaticRouter.class);
 
 }

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-01-31 08:26:53 UTC (rev 9182)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-01-31 08:28:04 UTC (rev 9183)
@@ -26,6 +26,7 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
@@ -49,9 +50,9 @@
 public abstract class MessageRouter
 {
 	private static Logger logger = Logger.getLogger(MessageRouter.class);
-
+	public final static String AGGEGRATOR_TAG = "aggregatorTag";
+    public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
 	
-
 	/**
 	 * Routes the message to the next destination.
 	 * 
@@ -74,8 +75,16 @@
 	public synchronized static void deliverMessages(
 			Collection<String[]> destinations, Message message) throws MessageRouterException
 	{
+        String uuId = UUID.randomUUID().toString();
+        int counter=0;
 		for (Iterator<String[]> i = destinations.iterator(); i.hasNext();)
 		{
+            message.getProperties().setProperty(AGGEGRATOR_TAG, uuId + ":" + ++counter + ":" + destinations.size());
+            message.getProperties().setProperty(SPLITTER_TIME_STAMP, new java.util.Date().getTime());
+            if (logger.isDebugEnabled()) {
+                String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG);
+                logger.debug(AGGEGRATOR_TAG + "=" + tag);
+            }
             String[] destination = (String[]) i.next();
 			String category = destination[0];
 			String serviceName = destination[1];

Added: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java	2007-01-31 08:28:04 UTC (rev 9183)
@@ -0,0 +1,277 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.InputStream;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.listeners.StandAloneBootStrapper;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.testutils.FileUtil;
+import org.jboss.soa.esb.testutils.HsqldbUtil;
+import org.jboss.soa.esb.testutils.TestEnvironmentUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing the Content Based Router.
+ * 
+ * @author <a href="mailto:kurt.stam at redhat.com">Kurt Stam</a>
+ * @author <a href="mailto:schifest at heuristica.com.ar">Esteban</a>
+ * @since Version 4.0
+ * 
+ */
+public class AggregatorTest
+{
+	private static Logger logger = Logger.getLogger(AggregatorTest.class);
+
+	private static StandAloneBootStrapper _boot = null;
+
+	private static String mDbDriver;
+
+	private static String mDbUrl;
+
+	private static String mDbUsername;
+
+	private static String mDbPassword;
+
+	private static final String SERVICE_CATEGORY_NAME = "MessageRouting";
+
+	private static final String SERVICE_NAME = "SplitterService";
+
+	/**
+	 * Testing the Content Based Router.
+	 */
+	@Test
+	public void sendMessages()
+	{
+		try
+		{
+			sendMessage("First Message");
+			// The second time the rulesEngine should be primed.
+			sendMessage("Second Message");
+		}
+		catch (RegistryException re)
+		{
+			re.printStackTrace();
+			assertTrue(false);
+		}
+		catch (CourierException ce)
+		{
+			ce.printStackTrace();
+			assertTrue(false);
+		}
+		catch (MalformedEPRException me)
+		{
+			me.printStackTrace();
+			assertTrue(false);
+		}
+	}
+
+	/**
+	 * Sends a message to the CbrJmsQueueListener.
+	 * 
+	 * @param body -
+	 *            a String containing the body of the message.
+	 * @throws RegistryException
+	 * @throws CourierException
+	 * @throws MalformedEPRException
+	 */
+	
+	private static void sendMessage(String body) throws RegistryException, CourierException, MalformedEPRException
+	{
+		Message msg = MessageFactory.getInstance().getMessage();
+		msg.getBody().setContents(body.getBytes());
+		
+		Registry registry = RegistryFactory.getRegistry();
+		Collection<EPR> eprs = registry.findEPRs(SERVICE_CATEGORY_NAME,
+				SERVICE_NAME);
+		for (Iterator<EPR> eprIterator = eprs.iterator(); eprIterator.hasNext();)
+		{
+			// Just use the first EPR in the list.
+			EPR epr = eprIterator.next();
+			Courier courier = CourierFactory.getCourier(epr);
+			courier.deliver(msg);
+			break;
+		}
+	}
+
+	public static junit.framework.Test suite()
+	{
+		return new JUnit4TestAdapter(AggregatorTest.class);
+	}
+
+	@BeforeClass
+	public static void runBeforeAllTests()
+	{
+		try
+		{
+			DOMConfigurator.configure(TestEnvironmentUtil.getUserDir("product",
+					"../product")
+					+ "etc/test/resources/log4j.xml");
+			TestEnvironmentUtil.setESBPropertiesFileToUse("product",
+					"../product");
+			// Set the juddi properties file in System so juddi will pick it up
+			// later and use the test values.
+			String juddiPropertiesFile = "/org/jboss/soa/esb/services/registry/juddi-qatest.properties";
+			System.setProperty("juddi.propertiesFile", juddiPropertiesFile);
+			// Read this properties file to get the db connection string
+			Properties props = new Properties();
+			InputStream inStream = Class.class
+					.getResourceAsStream(juddiPropertiesFile);
+			props.load(inStream);
+			mDbDriver = props.getProperty("juddi.jdbcDriver");
+			mDbUrl = props.getProperty("juddi.jdbcUrl");
+			mDbUsername = props.getProperty("juddi.jdbcUsername");
+			mDbPassword = props.getProperty("juddi.jdbcPassword");
+
+			String database = "not tested yet";
+			if ("org.hsqldb.jdbcDriver".equals(mDbDriver))
+			{
+				database = "hsqldb";
+				// Bring up hsql on default port 9001
+				HsqldbUtil.startHsqldb(TestEnvironmentUtil.getUserDir(
+						"product", "../product")
+						+ "build/hsqltestdb", "juddi");
+			}
+			else if ("com.mysql.jdbc.Driver".equals(mDbDriver))
+			{
+				database = "mysql";
+			} // add and test your own database..
+
+			// Get the registry-schema create scripts
+			String sqlDir = TestEnvironmentUtil.getUserDir("product",
+					"../product")
+					+ "install/jUDDI-registry/sql/" + database + "/";
+			// Drop what is there now, if exists. We want to start fresh.
+			String sqlDropCmd = FileUtil.readTextFile(new File(sqlDir
+					+ "drop_database.sql"));
+			String sqlCreateCmd = FileUtil.readTextFile(new File(sqlDir
+					+ "create_database.sql"));
+			String sqlInsertPubCmd = FileUtil.readTextFile(new File(sqlDir
+					+ "insert_publishers.sql"));
+
+			try
+			{
+				Class.forName(mDbDriver);
+			}
+			catch (Exception e)
+			{
+				System.out.println("ERROR: failed to load " + database
+						+ " JDBC driver.");
+				e.printStackTrace();
+				return;
+			}
+			java.sql.Connection con = DriverManager.getConnection(mDbUrl,
+					mDbUsername, mDbPassword);
+			Statement stmnt = con.createStatement();
+			System.out.println("Dropping the schema if exist");
+			stmnt.execute(sqlDropCmd);
+			System.out.println("Creating the juddi-schema");
+			stmnt.execute(sqlCreateCmd);
+			System.out.println("Adding the jbossesb publisher");
+			stmnt.execute(sqlInsertPubCmd);
+
+			// Now we can bring up the ContentBasedRouter
+			String deploymentConfigFile = TestEnvironmentUtil.getUserDir("qa")
+					+ "junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml";
+			String validationFileName = TestEnvironmentUtil.getUserDir(
+					"product", "../product")
+					+ "etc/schemas/xml/jbossesb-1.0.xsd";
+			// Make sure this file exists
+			File validationFile = new File(validationFileName);
+			if (!validationFile.exists())
+			{
+				System.err.println("Validation file "
+						+ validationFile.getAbsolutePath() + " does not exist");
+				assertTrue(false);
+			}
+			_boot = new StandAloneBootStrapper(deploymentConfigFile,
+					validationFileName);
+
+			logger
+					.info("Testing to see if we can instantiate and start ListenerManager");
+		}
+		catch (Throwable e)
+		{
+			e.printStackTrace();
+			System.out
+					.println("We should stop testing, since we don't have a db.");
+			assertTrue(false);
+		}
+
+	}
+
+	/**
+	 * Shutdown the database
+	 * 
+	 * @throws Exception
+	 */
+	@AfterClass
+	public static void runAfterAllTests() throws Exception
+	{
+        Thread.sleep(120000);
+		// Increase Sleep for debugging
+		_boot.requestEnd();
+		// Give the esb time to finish
+		Thread.sleep(2000);
+		// Cleaning up the generated files
+		String listenerConfigFile = TestEnvironmentUtil.getUserDir("qa")
+				+ "junit/src/org/jboss/soa/esb/actions/jbossesb-listener.xml";
+		File listenerFile = new File(listenerConfigFile);
+		if (listenerFile.exists())
+			listenerFile.delete();
+		String gatewayConfigFile = TestEnvironmentUtil.getUserDir("qa")
+				+ "junit/src/org/jboss/soa/esb/actions/jbossesb-gateway.xml";
+		File gatewayFile = new File(gatewayConfigFile);
+		if (gatewayFile.exists())
+			gatewayFile.delete();
+
+		if ("org.hsqldb.jdbcDriver".equals(mDbDriver))
+		{
+			HsqldbUtil.stopHsqldb(mDbUrl, mDbUsername, mDbPassword);
+		}
+	}
+
+}


Property changes on: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml	                        (rev 0)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml	2007-01-31 08:28:04 UTC (rev 9183)
@@ -0,0 +1,78 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.xsd"
+parameterReloadSecs="10">
+
+    <!-- for activemq use:
+        jndi-context-factory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
+        jndi-URL="tcp://localhost:61616" -->
+    
+    <!--  for mq series use: 
+        jndi-context-factory="com.ibm.mq.jms.context.WMQInitialContextFactory"
+        jndi-URL="dev37:1414/SYSTEM.DEF.SVRCONN" -->
+    
+    <!--  for jbossmq use:
+        jndi-context-factory="org.jnp.interfaces.NamingContextFactory"
+        jndi-URL="localhost" -->
+    
+
+	<providers>
+          <jms-provider name="localhost" 
+                      connection-factory="ConnectionFactory"
+                      jndi-context-factory="org.jnp.interfaces.NamingContextFactory"
+                      jndi-URL="localhost" >
+                      
+              <jms-bus busid="QueueA">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/A"
+                   />
+              </jms-bus>
+              <jms-bus busid="QueueB">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/B"
+                  />
+              </jms-bus>
+          </jms-provider>
+      </providers>
+      <services>
+          <service 
+              category="Aggregation" 
+              name="Aggregrator" 
+              description="Aggregates messages">
+              <listeners>
+                  <jms-listener name="AggregatorListenQueue" busidref="QueueA"
+                      maxThreads="1">
+                  </jms-listener>
+              </listeners>
+              <actions>
+                  <action class="org.jboss.soa.esb.actions.Aggregator" name="Aggregator">
+                      <property name="timeoutInMillies" value="10000"/>
+                  </action>
+              </actions> 
+          </service>
+          <service 
+        	category="MessageRouting" 
+        	name="SplitterService" 
+        	description="Sends messages to 2 destinations">
+        	<listeners>        
+                  <jms-listener name="SplitterListenQueue"
+                              busidref="QueueB"
+                              maxThreads="1">          
+	             </jms-listener>
+             </listeners>
+            <actions>
+                <action name="print-before" class="org.jboss.soa.esb.actions.SystemPrintln">
+                    <property name="message" value="Hello Static Router" />
+                </action>
+                <action class="org.jboss.soa.esb.actions.StaticRouter" name="StaticRouter">
+                    <property name="destinations">
+                        <!-- send 2 copies straight to the aggregator -->
+                        <route-to service-category="Aggregation" service-name="Aggregrator" />
+                        <route-to service-category="Aggregation" service-name="Aggregrator" />
+                    </property> 
+                </action>
+            </actions> 
+        </service>
+   </services>
+</jbossesb>


Property changes on: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml
___________________________________________________________________
Name: svn:mime-type
   + text/xml
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list