[jboss-svn-commits] JBL Code SVN: r11420 - in labs/jbossesb/trunk/product: core/listeners/src/org/jboss/soa/esb/actions and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Apr 27 12:53:09 EDT 2007
Author: kurt.stam at jboss.com
Date: 2007-04-27 12:53:09 -0400 (Fri, 27 Apr 2007)
New Revision: 11420
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java
labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml
Modified:
labs/jbossesb/trunk/product/build-distr.xml
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml
Log:
JBESB-418, adding Dead Message Service. By default this server persist to the messageStore, and notifies to the console.
Modified: labs/jbossesb/trunk/product/build-distr.xml
===================================================================
--- labs/jbossesb/trunk/product/build-distr.xml 2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/build-distr.xml 2007-04-27 16:53:09 UTC (rev 11420)
@@ -142,6 +142,7 @@
<mkdir dir="${build.dir}/services" />
<copy todir="${build.dir}/services">
<fileset dir="${services.dir}/jbpm/build" includes="jbpm.esb/**/*"/>
+ <fileset dir="${services.dir}/jbossesb/build" includes="jbossesb.esb/**/*"/>
</copy>
</target>
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java 2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java 2007-04-27 16:53:09 UTC (rev 11420)
@@ -27,9 +27,6 @@
*/
package org.jboss.soa.esb.actions;
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -43,10 +40,6 @@
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.persistence.MessageStore;
-import org.jboss.soa.esb.services.persistence.MessageStoreException;
-import org.jboss.soa.esb.services.persistence.MessageStoreFactory;
-import org.jboss.soa.esb.services.persistence.MessageStoreType;
import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.services.registry.RegistryFactory;
@@ -91,15 +84,10 @@
log.error("The rule destination(s) " + destinations
+ " are not in found in the destination names in the configuration "
+ _destinations.keySet() + ". Please fix your configuration.");
- //TODO send it to a Dead Letter Service rather then hard coding it to go to the message store.
- MessageStore store = MessageStoreFactory.getInstance().getMessageStore(MessageStoreType.DEFAULT_TYPE);
- assertEquals((store != null), true);
- try {
- URI uid = store.addMessage(message, MessageStore.DLQ_CLASSIFICATION);
- store.setUndelivered(uid);
- } catch (MessageStoreException mse) {
- log.error("Could not store undeliverable message.", mse);
- }
+
+ MessageRouter.deliverMessage(MessagePersister.SERVICE_CATEGORY
+ , MessagePersister.SERVICE_NAME
+ , message);
}
return message;
}
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java 2007-04-27 16:53:09 UTC (rev 11420)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+/**
+ * Routes the Message argument to a fixed list of services ([category,name])
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+package org.jboss.soa.esb.actions;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.persistence.MessageStore;
+import org.jboss.soa.esb.services.persistence.MessageStoreException;
+import org.jboss.soa.esb.services.persistence.MessageStoreFactory;
+import org.jboss.soa.esb.services.persistence.MessageStoreType;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.routing.MessageRouterException;
+
+public class MessagePersister extends AbstractActionPipelineProcessor
+{
+ public final static String SERVICE_CATEGORY = "JBossESB-Internal";
+ public final static String SERVICE_NAME = "DeadLetterService";
+ public final static String MESSAGE_STORE_TYPE_ATTR = "message-store-type";
+ public final static String CLASSIFICATION_ATTR = "classfication";
+
+ protected ConfigTree config;
+ protected MessageStore messageStore;
+ private String classification;
+ private Logger log = Logger.getLogger(this.getClass());
+
+ public MessagePersister(ConfigTree config) throws ConfigurationException, RegistryException, MessageRouterException
+ {
+ this.config = config;
+ }
+ /**
+ * Persists the message to the MessageStore
+ */
+ public Message process(Message message) throws ActionProcessingException
+ {
+ try {
+ messageStore.addMessage(message, classification);
+ } catch (MessageStoreException mse) {
+ log.error("Could not store undeliverable message.", mse);
+ }
+ return message;
+ }
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.actions.ActionLifecycle#initialise()
+ */
+ public void initialise() throws ActionLifecycleException {
+ //Reading the config
+ String messageStoreTypeValue = config.getAttribute(MESSAGE_STORE_TYPE_ATTR);
+ try {
+
+ URI messageStoreType = MessageStoreType.DEFAULT_TYPE;
+ if (messageStoreTypeValue!=null) {
+ messageStoreType = new URI(messageStoreTypeValue);
+ }
+ String classificationValue = config.getAttribute(CLASSIFICATION_ATTR);
+ if (classificationValue!=null) {
+ classification = classificationValue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("MessagePersister started with classification=" + classification
+ + " and message-store-type=" + messageStoreType);
+ }
+ messageStore = MessageStoreFactory.getInstance().getMessageStore(messageStoreType);
+ } catch (URISyntaxException e) {
+ throw new ActionLifecycleException(
+ "The message store type '" + messageStoreTypeValue + "' is not valid.", e);
+ }
+
+ }
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.actions.ActionLifecycle#destroy()
+ */
+ public void destroy() throws ActionLifecycleException {
+ classification=null;
+ }
+
+
+}
Property changes on: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-04-27 16:53:09 UTC (rev 11420)
@@ -65,6 +65,89 @@
public abstract List<String> route(Message message)
throws MessageRouterException;
+ /**
+ * Sends the message on to the service with the name(s) we just obtained
+ * from the routing.
+ *
+ * @param destinationServices -
+ * Collection with the name of the destination services.
+ * @param message -
+ * the message that needs routing and delivery
+ * @param boolean - isSpitter, if true will Tag the messages for
+ * aggregation purposes.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized static void deliverMessage(
+ String serviceCategory, String serviceName, Message message) throws MessageRouterException
+ {
+ boolean isSent = false;
+ Exception lastException = null;
+ try
+ {
+ Registry registry = RegistryFactory.getRegistry();
+ logger.log(Priority.DEBUG, "Looking for EPRs for category="
+ + serviceCategory + " and serviceName=" + serviceName);
+ Collection<EPR> eprs = registry.findEPRs(serviceCategory, serviceName);
+ for (Iterator<EPR> eprIter = eprs.iterator(); eprIter.hasNext();)
+ {
+ EPR epr = eprs.iterator().next();
+ logger.log(Priority.DEBUG, "Message=" + message
+ + " -> Destination=" + serviceCategory + "-" + serviceName);
+ try
+ {
+ //TODO I think the following if block should be part of
+ //the FileEpr construction?
+ if (epr instanceof FileEpr) {
+ try {
+ FileEpr fileEpr = (FileEpr) epr;
+ FileEpr newEpr = new FileEpr(fileEpr.getURL());
+ newEpr.setPostDelete(false);
+ newEpr.setPostDirectory(fileEpr.getURL().getFile());
+ newEpr.setPostSuffix(fileEpr.getInputSuffix());
+ epr = newEpr;
+ } catch (URISyntaxException usex) {
+ logger.log(Priority.ERROR, "Malformed epr while setting :"
+ + epr, usex);
+ } catch (MalformedURLException muex) {
+ logger.log(Priority.ERROR, "Malformed epr:"
+ + epr, muex);
+ }
+ }
+ // Give the message to the courier
+ Courier courier = CourierFactory.getCourier(epr);
+ try
+ {
+ courier.deliver(message);
+ }
+ finally
+ {
+ CourierUtil.cleanCourier(courier) ;
+ }
+ isSent = true;
+ break;
+ } catch (MalformedEPRException ex) {
+ logger.log(Priority.ERROR, "Malformed epr:" + epr);
+ lastException = ex;
+ } catch (CourierException ce) {
+ logger.log(Priority.ERROR, "Could not send using epr:"
+ + epr, ce);
+ lastException = ce;
+ // if there are more eprs in the collection is will try
+ // the next one.
+ }
+ }
+ if (isSent == false)
+ {
+ logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.", lastException);
+ throw new MessageRouterException("Message Could not be delivered. ", lastException);
+ }
+ } catch (RegistryException re) {
+ logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. "
+ + re.getLocalizedMessage(), re);
+ throw new MessageRouterException("Message Could not be delivered due to issues with the Registry. "
+ + re.getMessage(), re);
+ }
+ }
/**
* Sends the message on to the service with the name(s) we just obtained
* from the routing.
@@ -99,77 +182,9 @@
logger.debug(AGGEGRATOR_TAG+ "=" + tag);
}
}
-
String[] destination = (String[]) i.next();
- String category = destination[0];
- String serviceName = destination[1];
- boolean isSent = false;
- Exception lastException = null;
- try
- {
- Registry registry = RegistryFactory.getRegistry();
- logger.log(Priority.DEBUG, "Looking for EPRs for category="
- + category + " and serviceName=" + serviceName);
- Collection<EPR> eprs = registry.findEPRs(category, serviceName);
- for (Iterator<EPR> eprIter = eprs.iterator(); eprIter.hasNext();)
- {
- EPR epr = eprs.iterator().next();
- logger.log(Priority.DEBUG, "Message=" + message
- + " -> Destination=" + destinations);
- try
- {
- //TODO I think the following if block should be part of
- //the FileEpr construction?
- if (epr instanceof FileEpr) {
- try {
- FileEpr fileEpr = (FileEpr) epr;
- FileEpr newEpr = new FileEpr(fileEpr.getURL());
- newEpr.setPostDelete(false);
- newEpr.setPostDirectory(fileEpr.getURL().getFile());
- newEpr.setPostSuffix(fileEpr.getInputSuffix());
- epr = newEpr;
- } catch (URISyntaxException usex) {
- logger.log(Priority.ERROR, "Malformed epr while setting :"
- + epr, usex);
- } catch (MalformedURLException muex) {
- logger.log(Priority.ERROR, "Malformed epr:"
- + epr, muex);
- }
- }
- // Give the message to the courier
- Courier courier = CourierFactory.getCourier(epr);
- try
- {
- courier.deliver(message);
- }
- finally
- {
- CourierUtil.cleanCourier(courier) ;
- }
- isSent = true;
- break;
- } catch (MalformedEPRException ex) {
- logger.log(Priority.ERROR, "Malformed epr:" + epr);
- lastException = ex;
- } catch (CourierException ce) {
- logger.log(Priority.ERROR, "Could not send using epr:"
- + epr, ce);
- lastException = ce;
- // if there are more eprs in the collection is will try
- // the next one.
- }
- }
- if (isSent == false)
- {
- logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.", lastException);
- throw new MessageRouterException("Message Could not be delivered. ", lastException);
- }
- } catch (RegistryException re) {
- logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. "
- + re.getLocalizedMessage(), re);
- throw new MessageRouterException("Message Could not be delivered due to issues with the Registry. "
- + re.getMessage(), re);
- }
+ deliverMessage(destination[0], destination[1], message);
+
}
}
Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml 2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml 2007-04-27 16:53:09 UTC (rev 11420)
@@ -4,4 +4,44 @@
jbossesb internal services
-->
+ <providers>
+ <jms-provider name="Default-JMS-Provider" connection-factory="ConnectionFactory">
+ <jms-bus busid="DeadMessageQueue">
+ <jms-message-filter
+ dest-type="QUEUE"
+ dest-name="queue/DeadMessageQueue"
+ />
+ </jms-bus>
+ </jms-provider>
+ </providers>
+
+ <services>
+ <service category="JBossESB-Internal"
+ name="DeadLetterService"
+ description="Dead Messages can be send to this service, which is configured to store and/or
+ notify" >
+ <listeners>
+ <jms-listener name="JMS-DLQListener"
+ busidref="DeadMessageQueue"
+ maxThreads="1"
+ />
+ </listeners>
+ <actions>
+ <action name="PersistAction"
+ class="org.jboss.soa.esb.actions.MessagePersister" >
+ <property name="classfication" value="DLQ"/>
+ <property name="message-store-type" value="urn:jboss/esb/persistence/type/DATABASE"/>
+ </action>
+ <action name="notificationAction"
+ class="org.jboss.soa.esb.actions.Notifier">
+ <property name="okMethod" value="notifyOK" />
+ <property name="notification-details">
+ <NotificationList>
+ <target class="NotifyConsole" />
+ </NotificationList>
+ </property>
+ </action>
+ </actions>
+ </service>
+ </services>
</jbossesb>
Added: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml (rev 0)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml 2007-04-27 16:53:09 UTC (rev 11420)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<server>
+ <mbean code="org.jboss.jms.server.destination.QueueService"
+ name="jboss.messaging.destination:service=Queue,name=DeadMessageQueue"
+ xmbean-dd="xmdesc/Queue-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ </mbean>
+</server>
Property changes on: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml
___________________________________________________________________
Name: svn:mime-type
+ text/xml
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list