[jboss-svn-commits] JBL Code SVN: r7372 - in labs/jbossesb/trunk: product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr product/core/services/src/org/jboss/soa/esb/services/routing qa/junit/src/org/jboss/soa/esb/listeners/message
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Nov 3 13:34:46 EST 2006
Author: kurt.stam at jboss.com
Date: 2006-11-03 13:34:35 -0500 (Fri, 03 Nov 2006)
New Revision: 7372
Modified:
labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListenerTest.java
Log:
Working on CBR configuration
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java 2006-11-03 17:20:11 UTC (rev 7371)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/internal/soa/esb/services/routing/cbr/JBossRulesRouter.java 2006-11-03 18:34:35 UTC (rev 7372)
@@ -25,6 +25,7 @@
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.services.registry.RegistryFactory;
import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.routing.MessageRouter;
import org.jboss.soa.esb.services.routing.MessageRouterException;
import org.jboss.soa.esb.services.routing.cbr.ContentBasedRouter;
@@ -70,39 +71,10 @@
//Now route there, later we will implement an option to place a callback.
destinationServices = (List) workingMemory.getGlobal("destinationServices");
logger.log(Priority.DEBUG, "Destination Services List: " + destinationServices);
- for (Iterator<String> i=destinationServices.iterator();i.hasNext();) {
- String destinationService = i.next();
- String[] strArray = destinationService.split(":");
- String category = strArray[0];
- String serviceName = strArray[1];
- boolean isSent=false;
- try {
- Registry registry = RegistryFactory.getRegistry();
- logger.log(Priority.INFO, "Looking for EPRs for category=" + category +
- " and serviceName=" + serviceName);
- Collection<EPR> eprs = registry.findEPRs(category, serviceName);
- for (Iterator<EPR> eprIter=eprs.iterator();eprIter.hasNext();) {
- EPR epr = eprs.iterator().next();
- logger.log(Priority.INFO, "Message=" + message + " -> Destination=" + destinationService);
- try {
- //Give the message to the courier
- Courier courier = CourierFactory.getCourier(epr);
- courier.deliver(message);
- isSent=true;
- break;
- } catch (CourierException ce) {
- logger.log(Priority.ERROR, "Could not send using epr:" + epr);
- //if there are more eprs in the collection is will try the next one.
- }
- }
- if (isSent==false) {
- logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.");
- //Route to /dev/null?
- }
- } catch (RegistryException re) {
- logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. " + re.getLocalizedMessage(), re);
- //Route to /dev/null?
- }
+ Boolean deliverMessages = (Boolean) message.getBody().get(MessageRouter.DELIVER_MESSAGES);
+ //Only actuall deliver the message if this is set in the message
+ if (Boolean.TRUE.equals(deliverMessages)) {
+ deliverMessages(destinationServices, message);
}
} catch (Exception e) {
logger.log(Priority.ERROR, e.getMessage() + ". Message is not routed.", e);
@@ -131,5 +103,48 @@
ruleBase.addPackage(pkg);
return ruleBase;
}
+ /**
+ * Sends the message on to the service with the name(s) we just obtained from the routing.
+ *
+ * @param destinationServices - Collection with the name of the destination services.
+ * @param message - the message that needs routing and delivery
+ */
+ private void deliverMessages(Collection<String> destinationServices, Message message)
+ {
+ for (Iterator<String> i=destinationServices.iterator();i.hasNext();) {
+ String destinationService = i.next();
+ String[] strArray = destinationService.split(":");
+ String category = strArray[0];
+ String serviceName = strArray[1];
+ boolean isSent=false;
+ try {
+ Registry registry = RegistryFactory.getRegistry();
+ logger.log(Priority.INFO, "Looking for EPRs for category=" + category +
+ " and serviceName=" + serviceName);
+ Collection<EPR> eprs = registry.findEPRs(category, serviceName);
+ for (Iterator<EPR> eprIter=eprs.iterator();eprIter.hasNext();) {
+ EPR epr = eprs.iterator().next();
+ logger.log(Priority.INFO, "Message=" + message + " -> Destination=" + destinationService);
+ try {
+ //Give the message to the courier
+ Courier courier = CourierFactory.getCourier(epr);
+ courier.deliver(message);
+ isSent=true;
+ break;
+ } catch (CourierException ce) {
+ logger.log(Priority.ERROR, "Could not send using epr:" + epr);
+ //if there are more eprs in the collection is will try the next one.
+ }
+ }
+ if (isSent==false) {
+ logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.");
+ //Route to /dev/null?
+ }
+ } catch (RegistryException re) {
+ logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. " + re.getLocalizedMessage(), re);
+ //Route to /dev/null?
+ }
+ }
+ }
}
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2006-11-03 17:20:11 UTC (rev 7371)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2006-11-03 18:34:35 UTC (rev 7372)
@@ -33,6 +33,7 @@
public interface MessageRouter
{
String ROUTING_DESTINATION_SERVICE_LIST = "routing.destinationServiceList";
+ String DELIVER_MESSAGES = "routing.deliverMessages";
/**
* Routes the message to the next destination.
*
Modified: labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListenerTest.java
===================================================================
--- labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListenerTest.java 2006-11-03 17:20:11 UTC (rev 7371)
+++ labs/jbossesb/trunk/qa/junit/src/org/jboss/soa/esb/listeners/message/CbrJmsQueueListenerTest.java 2006-11-03 18:34:35 UTC (rev 7372)
@@ -58,7 +58,7 @@
*/
public class CbrJmsQueueListenerTest
{
- private Logger logger = Logger.getLogger(this.getClass());
+ private static Logger logger = Logger.getLogger(CbrJmsQueueListenerTest.class);
private static EsbListenerController _proc = null;
@@ -74,36 +74,32 @@
* Testing the Content Based Router.
*/
@Test
- public void instantiate()
+ public void sendMessages()
{
try {
- logger.info("Testing to see if we can instantiate one");
- String deploymentConfigFile = TestUtil.getPrefix("qa")
- + "junit/src/org/jboss/soa/esb/listeners/message/ContentBasedRouting.xml";
- _proc = new EsbListenerController(deploymentConfigFile);
- new Thread(_proc).start();
- // give the listener time to register
- Thread.sleep(4000);
- sendOneMessage();
- _proc.requestEnd();
-// //give the action class time to finish
- Thread.sleep(4000);
-//
- EsbListenerController.State oS = _proc.getState();
- System.out.println("Exit state = "+oS.toString());
+ sendMessage("First Message");
+ //The second time the rulesEngine should be primed.
+ sendMessage("Second Message");
} catch (Exception e) {
assertTrue(false);
e.printStackTrace();
}
}
-
- private static void sendOneMessage() throws Exception{
+ /**
+ * Sends a message to the CbrJmsQueueListener.
+ *
+ * @param body - a String containing the body of the message.
+ * @throws Exception
+ */
+ private static void sendMessage(String body) throws Exception{
Message msg = MessageFactory.getInstance().getMessage();
- msg.getBody().setContents("Routing Test Body".getBytes());
+ msg.getBody().setContents(body.getBytes());
+ //msg.getBody().add(MessageRouter.DELIVER_MESSAGES, Boolean.TRUE);
try {
Registry registry = RegistryFactory.getRegistry();
Collection<EPR> eprs = registry.findEPRs(SERVICE_CATEGORY_NAME, SERVICE_NAME);
for (Iterator<EPR> eprIterator=eprs.iterator();eprIterator.hasNext();){
+ //Just use the first EPR in the list.
EPR epr = eprIterator.next();
Courier courier = CourierFactory.getCourier(epr);
courier.deliver(msg);
@@ -168,11 +164,23 @@
stmnt.execute(sqlCreateCmd);
System.out.println("Adding the jbossesb publisher");
stmnt.execute(sqlInsertPubCmd);
+
+ //Now we can bring up the ContentBasedRouter
+ logger.info("Testing to see if we can instantiate one");
+ String deploymentConfigFile = TestUtil.getPrefix("qa")
+ + "junit/src/org/jboss/soa/esb/listeners/message/ContentBasedRouting.xml";
+ _proc = new EsbListenerController(deploymentConfigFile);
+ new Thread(_proc).start();
+ // give the listener time to register
+ Thread.sleep(5000);
+
} catch (Exception e) {
e.printStackTrace();
System.out.println("We should stop testing, since we don't have a db.");
assertTrue(false);
}
+
+
}
/**
@@ -181,6 +189,12 @@
*/
@AfterClass
public static void runAfterAllTests() throws Exception {
+ _proc.requestEnd();
+// give the controller time to finish
+ Thread.sleep(2000);
+ EsbListenerController.State oS = _proc.getState();
+ System.out.println("Exit state = "+oS.toString());
+
if ("org.hsqldb.jdbcDriver".equals(mDbDriver)) {
HsqldbUtil.stopHsqldb(mDbUrl, mDbUsername, mDbPassword);
}
More information about the jboss-svn-commits
mailing list