[jboss-svn-commits] JBL Code SVN: r9183 - in labs/jbossesb/trunk: product/core/services/src/org/jboss/soa/esb/services/routing and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Jan 31 03:28:04 EST 2007
Author: kurt.stam at jboss.com
Date: 2007-01-31 03:28:04 -0500 (Wed, 31 Jan 2007)
New Revision: 9183
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
Log:
Adding Splitter/Aggregator actions
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java 2007-01-31 08:28:04 UTC (rev 9183)
@@ -0,0 +1,245 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+/**
+ * Routes the Message argument to a fixed list of services ([category,name])
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+package org.jboss.soa.esb.actions;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.services.routing.MessageRouter;
+import org.jboss.soa.esb.services.routing.MessageRouterException;
+
+/**
+ * Simple Aggregator. The aggregator relies on 'aggregatorTags'. To puzzle the individual
+ * back together. The aggregatorTag is set in the MessageRouter.deliver() method. The aggregator
+ * adds a collected message in the series as attachements to a new message. When all
+ * messages are received or if we are timeout an aggregated message is returned.
+ * In all other cases null is returned.
+ *
+ * Future enhancement should be
+ * - all sort of waitfor/timout algorithms.
+ * - persisting the map so no messages get lost.
+ * - make the map managable.
+ *
+ * @author kurt.stam at redhat.com
+ *
+ */
+public class Aggregator
+{
+ public static ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
+ = new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
+ private static TimeoutChecker _timeoutChecker=null;
+ private static ArrayList<String> _notified = new ArrayList<String>();
+
+ protected ConfigTree config;
+ private Logger logger = Logger.getLogger(Aggregator.class);
+ private Long timeoutInMillies=null;
+ private String serviceName;
+ private String serviceCategoryName;
+ private Registry registry;
+
+ private Aggregator(){}
+
+ public Aggregator(ConfigTree config) throws ConfigurationException, RegistryException
+ {
+ if (_timeoutChecker==null) {
+ // On startup we should re-populate the _aggregatedMessageMap from permanent storage
+ _timeoutChecker = new TimeoutChecker();
+ _timeoutChecker.start();
+ }
+
+ this.config = config;
+ checkMyParms();
+ registry = RegistryFactory.getRegistry();
+ }
+ /**
+ * Processes an incoming message, aggregates messages in a set and returns a aggregated message
+ * when isComplete() is satisfied. The aggregated messages are set as attachments to this message.
+ * Next the message can be send to a transformer to do the second part of the aggregation which is
+ * to convert the attachement messages into to one message.
+ *
+ * @param message
+ * @return a aggregated message, or null if the aggregation has not been completed.
+ * @throws MessageRouterException
+ */
+ public Message process(Message message) throws MessageRouterException
+ {
+ String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+ if (aggregatorTag!=null) {
+ String[] tag = aggregatorTag.split(":");
+ String uuId = tag[0];
+ String messageNumber = tag[1];
+ int totalNumberOfMessages = Integer.valueOf(tag[2]).intValue();
+
+ if (isTimedOut(message)) {
+ if (_aggregatedMessageMap.containsKey(uuId)) {
+ ConcurrentHashMap<String, Message> messageMap = _aggregatedMessageMap.get(uuId);
+ //add the message in if we don't already have it
+ if (!messageMap.containsKey(uuId)) {
+ messageMap.put(messageNumber, message);
+ }
+ //Just going to send out what we have this far, which will remove this key
+ //so subsequent messages with this uuId will be ignored
+ message = createAggregateMessage(uuId, _aggregatedMessageMap.get(uuId));
+ } else {
+ logger.debug("Ignoring this message since we are already timedout on this message.");
+ //ignoring this message
+ message = null;
+ }
+ } else {
+ ConcurrentHashMap<String, Message> messageMap=null;
+ //Get the messageMap with this uuId, or create it if not found
+ if (_aggregatedMessageMap.containsKey(uuId)) {
+ messageMap = _aggregatedMessageMap.get(uuId);
+ } else {
+ messageMap = new ConcurrentHashMap<String, Message>();
+ }
+ if (messageMap.containsKey(messageNumber)) {
+ logger.warn("Received duplicate message, ignoring it but this should not happen.");
+ } else {
+ messageMap.put(messageNumber, message);
+ _aggregatedMessageMap.put(uuId, messageMap);
+ }
+ if (messageMap.size()==totalNumberOfMessages) {
+ message = createAggregateMessage(uuId, messageMap);
+ } else {
+ message = null;
+ }
+ }
+ } else {
+ throw new MessageRouterException("Could not find an aggregator tag, so this message can not be aggregated.");
+ }
+ return message;
+ }
+ /**
+ * Obtains the timeoutInMillies to a number
+ *
+ * @throws ConfigurationException
+ */
+ private void checkMyParms() throws ConfigurationException
+ {
+ timeoutInMillies = Long.valueOf(config.getAttribute("timeoutInMillies"));
+ logger.debug("Aggregator config: timeoutInMillies=" + timeoutInMillies);
+ serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ }
+ /**
+ * Aggregates the messages into 1 new message with an attachment for each message.
+ *
+ * @param uuId
+ * @param messageMap
+ * @return the aggregated message
+ */
+ private Message createAggregateMessage(String uuId, ConcurrentHashMap<String, Message> messageMap)
+ {
+ //Create an aggregated message
+ Message aggregatedMessage = MessageFactory.getInstance().getMessage();
+ for (Message oneMessage : messageMap.values()) {
+ //Add the individual messages as attachments
+ aggregatedMessage.getAttachment().addItem(oneMessage);
+ }
+ _aggregatedMessageMap.remove(uuId);
+ //TODO remove messageMap from permanent storage, or do that per message in the loop above using value of the aggregatorTag
+ //remove from the notificationMap if it is in there.
+ _notified.remove(uuId);
+
+ return aggregatedMessage;
+ }
+ /**
+ * If the aggregation process is complete then return true. This depends on the configuration.
+ *
+ * @param totalNumberOfMessages
+ * @param splitterTimeStamp
+ * @param messageMap
+ * @return
+ */
+ private boolean isTimedOut(Message message)
+ {
+ long splitterTimeStamp = (Long) message.getProperties().getProperty(MessageRouter.SPLITTER_TIME_STAMP);
+ if (timeoutInMillies!=null) {
+ long now = new java.util.Date().getTime();
+ if (splitterTimeStamp + Long.valueOf(timeoutInMillies) > now) {
+ return true;
+ }
+ }
+ return false;
+ }
+ /**
+ * Checks for message that are timed out. If we find that one we notify ourselves about it by resending the message.
+ *
+ * @author kstam
+ *
+ */
+ class TimeoutChecker extends Thread {
+ public void run() {
+ while(true) {
+ for (ConcurrentHashMap< String, Message > messageMap : _aggregatedMessageMap.values()) {
+ //Check the first message, they all have the same time stamp
+ Message message = messageMap.values().iterator().next();
+ if (isTimedOut(message)) {
+ //We found a timed-out message. Let's go notify ourselves about by resending a message,
+ //it if we haven't done so already
+ String aggregatorTag = (String) message.getProperties().getProperty(MessageRouter.AGGEGRATOR_TAG);
+ String[] tag = aggregatorTag.split(":");
+ String uuId = tag[0];
+ if (!_notified.contains(uuId)) {
+ _notified.add(uuId);
+ logger.debug("Found timeout message.");
+ try {
+ EPR epr = registry.findEPR(serviceCategoryName, serviceName);
+ Courier courier = CourierFactory.getCourier(epr);
+ courier.deliver(message);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ //If we can't notify then drop this data
+ logger.debug("Deleting data for message series with uuId=" + uuId);
+ _notified.remove(uuId);
+ _aggregatedMessageMap.remove(uuId);
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(500);
+ } catch (Exception e){}
+ }
+ }
+ }
+
+}
Property changes on: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/Aggregator.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java 2007-01-31 08:26:53 UTC (rev 9182)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/StaticRouter.java 2007-01-31 08:28:04 UTC (rev 9183)
@@ -32,15 +32,12 @@
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.registry.RegistryFactory;
import org.jboss.soa.esb.services.routing.MessageRouter;
import org.jboss.soa.esb.services.routing.MessageRouterException;
@@ -56,16 +53,15 @@
{
_config = config;
checkMyParms();
- _registry = RegistryFactory.getRegistry();
} // ________________________________
public Message process(Message message) throws MalformedEPRException, RegistryException, CourierException
{
try
{
- Call call = message.getHeader().getCall();
- if (null == call || null == call.getMessageID())
- throw new IllegalArgumentException("Null message ID");
+ // Call call = message.getHeader().getCall();
+ // if (null == call || null == call.getMessageID())
+ // throw new IllegalArgumentException("Null message ID");
MessageRouter.deliverMessages(_destinations, message);
return message;
@@ -93,7 +89,7 @@
String name = curr.getRequiredAttribute(
ListenerTagNames.SERVICE_NAME_TAG);
_destinations.add(new String[]
- { category, name });
+ { category, name});
}
catch (Exception e)
{
@@ -106,8 +102,6 @@
protected List<String[]> _destinations;
- protected Registry _registry;
-
protected static Logger _logger = Logger.getLogger(StaticRouter.class);
}
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-01-31 08:26:53 UTC (rev 9182)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-01-31 08:28:04 UTC (rev 9183)
@@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
@@ -49,9 +50,9 @@
public abstract class MessageRouter
{
private static Logger logger = Logger.getLogger(MessageRouter.class);
-
+ public final static String AGGEGRATOR_TAG = "aggregatorTag";
+ public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
-
/**
* Routes the message to the next destination.
*
@@ -74,8 +75,16 @@
public synchronized static void deliverMessages(
Collection<String[]> destinations, Message message) throws MessageRouterException
{
+ String uuId = UUID.randomUUID().toString();
+ int counter=0;
for (Iterator<String[]> i = destinations.iterator(); i.hasNext();)
{
+ message.getProperties().setProperty(AGGEGRATOR_TAG, uuId + ":" + ++counter + ":" + destinations.size());
+ message.getProperties().setProperty(SPLITTER_TIME_STAMP, new java.util.Date().getTime());
+ if (logger.isDebugEnabled()) {
+ String tag = (String) message.getProperties().getProperty(AGGEGRATOR_TAG);
+ logger.debug(AGGEGRATOR_TAG + "=" + tag);
+ }
String[] destination = (String[]) i.next();
String category = destination[0];
String serviceName = destination[1];
Added: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java (rev 0)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java 2007-01-31 08:28:04 UTC (rev 9183)
@@ -0,0 +1,277 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.InputStream;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.listeners.StandAloneBootStrapper;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.testutils.FileUtil;
+import org.jboss.soa.esb.testutils.HsqldbUtil;
+import org.jboss.soa.esb.testutils.TestEnvironmentUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing the Content Based Router.
+ *
+ * @author <a href="mailto:kurt.stam at redhat.com">Kurt Stam</a>
+ * @author <a href="mailto:schifest at heuristica.com.ar">Esteban</a>
+ * @since Version 4.0
+ *
+ */
+public class AggregatorTest
+{
+ private static Logger logger = Logger.getLogger(AggregatorTest.class);
+
+ private static StandAloneBootStrapper _boot = null;
+
+ private static String mDbDriver;
+
+ private static String mDbUrl;
+
+ private static String mDbUsername;
+
+ private static String mDbPassword;
+
+ private static final String SERVICE_CATEGORY_NAME = "MessageRouting";
+
+ private static final String SERVICE_NAME = "SplitterService";
+
+ /**
+ * Testing the Content Based Router.
+ */
+ @Test
+ public void sendMessages()
+ {
+ try
+ {
+ sendMessage("First Message");
+ // The second time the rulesEngine should be primed.
+ sendMessage("Second Message");
+ }
+ catch (RegistryException re)
+ {
+ re.printStackTrace();
+ assertTrue(false);
+ }
+ catch (CourierException ce)
+ {
+ ce.printStackTrace();
+ assertTrue(false);
+ }
+ catch (MalformedEPRException me)
+ {
+ me.printStackTrace();
+ assertTrue(false);
+ }
+ }
+
+ /**
+ * Sends a message to the CbrJmsQueueListener.
+ *
+ * @param body -
+ * a String containing the body of the message.
+ * @throws RegistryException
+ * @throws CourierException
+ * @throws MalformedEPRException
+ */
+
+ private static void sendMessage(String body) throws RegistryException, CourierException, MalformedEPRException
+ {
+ Message msg = MessageFactory.getInstance().getMessage();
+ msg.getBody().setContents(body.getBytes());
+
+ Registry registry = RegistryFactory.getRegistry();
+ Collection<EPR> eprs = registry.findEPRs(SERVICE_CATEGORY_NAME,
+ SERVICE_NAME);
+ for (Iterator<EPR> eprIterator = eprs.iterator(); eprIterator.hasNext();)
+ {
+ // Just use the first EPR in the list.
+ EPR epr = eprIterator.next();
+ Courier courier = CourierFactory.getCourier(epr);
+ courier.deliver(msg);
+ break;
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(AggregatorTest.class);
+ }
+
+ @BeforeClass
+ public static void runBeforeAllTests()
+ {
+ try
+ {
+ DOMConfigurator.configure(TestEnvironmentUtil.getUserDir("product",
+ "../product")
+ + "etc/test/resources/log4j.xml");
+ TestEnvironmentUtil.setESBPropertiesFileToUse("product",
+ "../product");
+ // Set the juddi properties file in System so juddi will pick it up
+ // later and use the test values.
+ String juddiPropertiesFile = "/org/jboss/soa/esb/services/registry/juddi-qatest.properties";
+ System.setProperty("juddi.propertiesFile", juddiPropertiesFile);
+ // Read this properties file to get the db connection string
+ Properties props = new Properties();
+ InputStream inStream = Class.class
+ .getResourceAsStream(juddiPropertiesFile);
+ props.load(inStream);
+ mDbDriver = props.getProperty("juddi.jdbcDriver");
+ mDbUrl = props.getProperty("juddi.jdbcUrl");
+ mDbUsername = props.getProperty("juddi.jdbcUsername");
+ mDbPassword = props.getProperty("juddi.jdbcPassword");
+
+ String database = "not tested yet";
+ if ("org.hsqldb.jdbcDriver".equals(mDbDriver))
+ {
+ database = "hsqldb";
+ // Bring up hsql on default port 9001
+ HsqldbUtil.startHsqldb(TestEnvironmentUtil.getUserDir(
+ "product", "../product")
+ + "build/hsqltestdb", "juddi");
+ }
+ else if ("com.mysql.jdbc.Driver".equals(mDbDriver))
+ {
+ database = "mysql";
+ } // add and test your own database..
+
+ // Get the registry-schema create scripts
+ String sqlDir = TestEnvironmentUtil.getUserDir("product",
+ "../product")
+ + "install/jUDDI-registry/sql/" + database + "/";
+ // Drop what is there now, if exists. We want to start fresh.
+ String sqlDropCmd = FileUtil.readTextFile(new File(sqlDir
+ + "drop_database.sql"));
+ String sqlCreateCmd = FileUtil.readTextFile(new File(sqlDir
+ + "create_database.sql"));
+ String sqlInsertPubCmd = FileUtil.readTextFile(new File(sqlDir
+ + "insert_publishers.sql"));
+
+ try
+ {
+ Class.forName(mDbDriver);
+ }
+ catch (Exception e)
+ {
+ System.out.println("ERROR: failed to load " + database
+ + " JDBC driver.");
+ e.printStackTrace();
+ return;
+ }
+ java.sql.Connection con = DriverManager.getConnection(mDbUrl,
+ mDbUsername, mDbPassword);
+ Statement stmnt = con.createStatement();
+ System.out.println("Dropping the schema if exist");
+ stmnt.execute(sqlDropCmd);
+ System.out.println("Creating the juddi-schema");
+ stmnt.execute(sqlCreateCmd);
+ System.out.println("Adding the jbossesb publisher");
+ stmnt.execute(sqlInsertPubCmd);
+
+ // Now we can bring up the ContentBasedRouter
+ String deploymentConfigFile = TestEnvironmentUtil.getUserDir("qa")
+ + "junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml";
+ String validationFileName = TestEnvironmentUtil.getUserDir(
+ "product", "../product")
+ + "etc/schemas/xml/jbossesb-1.0.xsd";
+ // Make sure this file exists
+ File validationFile = new File(validationFileName);
+ if (!validationFile.exists())
+ {
+ System.err.println("Validation file "
+ + validationFile.getAbsolutePath() + " does not exist");
+ assertTrue(false);
+ }
+ _boot = new StandAloneBootStrapper(deploymentConfigFile,
+ validationFileName);
+
+ logger
+ .info("Testing to see if we can instantiate and start ListenerManager");
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.out
+ .println("We should stop testing, since we don't have a db.");
+ assertTrue(false);
+ }
+
+ }
+
+ /**
+ * Shutdown the database
+ *
+ * @throws Exception
+ */
+ @AfterClass
+ public static void runAfterAllTests() throws Exception
+ {
+ Thread.sleep(120000);
+ // Increase Sleep for debugging
+ _boot.requestEnd();
+ // Give the esb time to finish
+ Thread.sleep(2000);
+ // Cleaning up the generated files
+ String listenerConfigFile = TestEnvironmentUtil.getUserDir("qa")
+ + "junit/src/org/jboss/soa/esb/actions/jbossesb-listener.xml";
+ File listenerFile = new File(listenerConfigFile);
+ if (listenerFile.exists())
+ listenerFile.delete();
+ String gatewayConfigFile = TestEnvironmentUtil.getUserDir("qa")
+ + "junit/src/org/jboss/soa/esb/actions/jbossesb-gateway.xml";
+ File gatewayFile = new File(gatewayConfigFile);
+ if (gatewayFile.exists())
+ gatewayFile.delete();
+
+ if ("org.hsqldb.jdbcDriver".equals(mDbDriver))
+ {
+ HsqldbUtil.stopHsqldb(mDbUrl, mDbUsername, mDbPassword);
+ }
+ }
+
+}
Property changes on: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml (rev 0)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml 2007-01-31 08:28:04 UTC (rev 9183)
@@ -0,0 +1,78 @@
+<?xml version = "1.0" encoding = "UTF-8"?>
+<jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.xsd"
+parameterReloadSecs="10">
+
+ <!-- for activemq use:
+ jndi-context-factory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
+ jndi-URL="tcp://localhost:61616" -->
+
+ <!-- for mq series use:
+ jndi-context-factory="com.ibm.mq.jms.context.WMQInitialContextFactory"
+ jndi-URL="dev37:1414/SYSTEM.DEF.SVRCONN" -->
+
+ <!-- for jbossmq use:
+ jndi-context-factory="org.jnp.interfaces.NamingContextFactory"
+ jndi-URL="localhost" -->
+
+
+ <providers>
+ <jms-provider name="localhost"
+ connection-factory="ConnectionFactory"
+ jndi-context-factory="org.jnp.interfaces.NamingContextFactory"
+ jndi-URL="localhost" >
+
+ <jms-bus busid="QueueA">
+ <jms-message-filter
+ dest-type="QUEUE"
+ dest-name="queue/A"
+ />
+ </jms-bus>
+ <jms-bus busid="QueueB">
+ <jms-message-filter
+ dest-type="QUEUE"
+ dest-name="queue/B"
+ />
+ </jms-bus>
+ </jms-provider>
+ </providers>
+ <services>
+ <service
+ category="Aggregation"
+ name="Aggregrator"
+ description="Aggregates messages">
+ <listeners>
+ <jms-listener name="AggregatorListenQueue" busidref="QueueA"
+ maxThreads="1">
+ </jms-listener>
+ </listeners>
+ <actions>
+ <action class="org.jboss.soa.esb.actions.Aggregator" name="Aggregator">
+ <property name="timeoutInMillies" value="10000"/>
+ </action>
+ </actions>
+ </service>
+ <service
+ category="MessageRouting"
+ name="SplitterService"
+ description="Sends messages to 2 destinations">
+ <listeners>
+ <jms-listener name="SplitterListenQueue"
+ busidref="QueueB"
+ maxThreads="1">
+ </jms-listener>
+ </listeners>
+ <actions>
+ <action name="print-before" class="org.jboss.soa.esb.actions.SystemPrintln">
+ <property name="message" value="Hello Static Router" />
+ </action>
+ <action class="org.jboss.soa.esb.actions.StaticRouter" name="StaticRouter">
+ <property name="destinations">
+ <!-- send 2 copies straight to the aggregator -->
+ <route-to service-category="Aggregation" service-name="Aggregrator" />
+ <route-to service-category="Aggregation" service-name="Aggregrator" />
+ </property>
+ </action>
+ </actions>
+ </service>
+ </services>
+</jbossesb>
Property changes on: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/actions/AggregatorTest.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list