[jboss-svn-commits] JBL Code SVN: r11420 - in labs/jbossesb/trunk/product: core/listeners/src/org/jboss/soa/esb/actions and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Apr 27 12:53:09 EDT 2007


Author: kurt.stam at jboss.com
Date: 2007-04-27 12:53:09 -0400 (Fri, 27 Apr 2007)
New Revision: 11420

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java
   labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml
Modified:
   labs/jbossesb/trunk/product/build-distr.xml
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
   labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
   labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml
Log:
JBESB-418, adding Dead Message Service. By default this server persist to the messageStore, and notifies to the console.

Modified: labs/jbossesb/trunk/product/build-distr.xml
===================================================================
--- labs/jbossesb/trunk/product/build-distr.xml	2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/build-distr.xml	2007-04-27 16:53:09 UTC (rev 11420)
@@ -142,6 +142,7 @@
    		<mkdir dir="${build.dir}/services" />
    		<copy todir="${build.dir}/services">
    			<fileset dir="${services.dir}/jbpm/build" includes="jbpm.esb/**/*"/>
+   			<fileset dir="${services.dir}/jbossesb/build" includes="jbossesb.esb/**/*"/>
    		</copy>
    </target>
 

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java	2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ContentBasedRouter.java	2007-04-27 16:53:09 UTC (rev 11420)
@@ -27,9 +27,6 @@
  */
 package org.jboss.soa.esb.actions;
 
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -43,10 +40,6 @@
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.persistence.MessageStore;
-import org.jboss.soa.esb.services.persistence.MessageStoreException;
-import org.jboss.soa.esb.services.persistence.MessageStoreFactory;
-import org.jboss.soa.esb.services.persistence.MessageStoreType;
 import org.jboss.soa.esb.services.registry.Registry;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.services.registry.RegistryFactory;
@@ -91,15 +84,10 @@
             log.error("The rule destination(s) " + destinations 
                     + " are  not in found in the destination names in the configuration "
                     + _destinations.keySet() + ". Please fix your configuration.");
-            //TODO send it to a Dead Letter Service rather then hard coding it to go to the message store.
-            MessageStore store = MessageStoreFactory.getInstance().getMessageStore(MessageStoreType.DEFAULT_TYPE);
-            assertEquals((store != null), true);
-            try {
-                URI uid = store.addMessage(message, MessageStore.DLQ_CLASSIFICATION);
-                store.setUndelivered(uid);
-            } catch (MessageStoreException mse) {
-                log.error("Could not store undeliverable message.", mse);
-            }
+           
+            MessageRouter.deliverMessage(MessagePersister.SERVICE_CATEGORY
+                                       , MessagePersister.SERVICE_NAME
+                                       , message);
         }
 		return message;
 	}

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java	2007-04-27 16:53:09 UTC (rev 11420)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+/**
+ * Routes the Message argument to a fixed list of services ([category,name]) 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+package org.jboss.soa.esb.actions;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.persistence.MessageStore;
+import org.jboss.soa.esb.services.persistence.MessageStoreException;
+import org.jboss.soa.esb.services.persistence.MessageStoreFactory;
+import org.jboss.soa.esb.services.persistence.MessageStoreType;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.services.routing.MessageRouterException;
+
+public class MessagePersister extends AbstractActionPipelineProcessor
+{
+    public final static String SERVICE_CATEGORY = "JBossESB-Internal";
+    public final static String SERVICE_NAME = "DeadLetterService";
+    public final static String MESSAGE_STORE_TYPE_ATTR = "message-store-type";
+    public final static String CLASSIFICATION_ATTR = "classfication";
+    
+    protected ConfigTree config;
+    protected MessageStore messageStore;
+    private String classification;
+	private Logger log = Logger.getLogger(this.getClass());
+
+	public MessagePersister(ConfigTree config) throws ConfigurationException, RegistryException, MessageRouterException
+	{
+        this.config = config;
+	}
+	/** 
+     * Persists the message to the MessageStore
+	 */
+	public Message process(Message message) throws ActionProcessingException
+	{
+        try {
+            messageStore.addMessage(message, classification);
+        } catch (MessageStoreException mse) {
+            log.error("Could not store undeliverable message.", mse);
+        }
+		return message;
+	}
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.actions.ActionLifecycle#initialise()
+     */
+    public void initialise() throws ActionLifecycleException {
+        //Reading the config
+        String messageStoreTypeValue = config.getAttribute(MESSAGE_STORE_TYPE_ATTR);
+        try {
+            
+            URI messageStoreType = MessageStoreType.DEFAULT_TYPE;
+            if (messageStoreTypeValue!=null) {
+                messageStoreType = new URI(messageStoreTypeValue);
+            }
+            String classificationValue   = config.getAttribute(CLASSIFICATION_ATTR);
+            if (classificationValue!=null) {
+                classification = classificationValue;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("MessagePersister started with classification=" + classification 
+                    + " and message-store-type=" + messageStoreType);
+            }
+            messageStore = MessageStoreFactory.getInstance().getMessageStore(messageStoreType);
+        } catch (URISyntaxException e) {
+                throw new ActionLifecycleException(
+                        "The message store type '" + messageStoreTypeValue + "' is not valid.", e);
+        }
+       
+    }
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.actions.ActionLifecycle#destroy()
+     */
+    public void destroy() throws ActionLifecycleException {
+       classification=null;
+    }
+    
+    
+}


Property changes on: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/MessagePersister.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/core/services/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-04-27 16:53:09 UTC (rev 11420)
@@ -65,6 +65,89 @@
 	public abstract List<String> route(Message message)
     throws MessageRouterException;
 
+    /**
+     * Sends the message on to the service with the name(s) we just obtained
+     * from the routing.
+     * 
+     * @param destinationServices -
+     *            Collection with the name of the destination services.
+     * @param message -
+     *            the message that needs routing and delivery
+     * @param boolean - isSpitter, if true will Tag the messages for
+     *            aggregation purposes.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized static void deliverMessage(
+            String serviceCategory, String serviceName, Message message) throws MessageRouterException
+    {
+        boolean isSent = false;
+        Exception lastException = null;
+        try
+        {
+            Registry registry = RegistryFactory.getRegistry();
+            logger.log(Priority.DEBUG, "Looking for EPRs for category="
+                    + serviceCategory + " and serviceName=" + serviceName);
+            Collection<EPR> eprs = registry.findEPRs(serviceCategory, serviceName);
+            for (Iterator<EPR> eprIter = eprs.iterator(); eprIter.hasNext();)
+            {
+                EPR epr = eprs.iterator().next();
+                logger.log(Priority.DEBUG, "Message=" + message
+                        + " -> Destination=" + serviceCategory + "-" + serviceName);
+                try
+                {
+                    //TODO I think the following if block should be part of
+                    //the FileEpr construction?
+                    if (epr instanceof FileEpr) {
+                        try {
+                            FileEpr fileEpr = (FileEpr) epr;
+                            FileEpr newEpr = new FileEpr(fileEpr.getURL());
+                            newEpr.setPostDelete(false);
+                            newEpr.setPostDirectory(fileEpr.getURL().getFile());
+                            newEpr.setPostSuffix(fileEpr.getInputSuffix());
+                            epr = newEpr;
+                        } catch (URISyntaxException usex) {
+                            logger.log(Priority.ERROR, "Malformed epr while setting :"
+                                    + epr, usex);
+                        } catch (MalformedURLException muex) {
+                            logger.log(Priority.ERROR, "Malformed epr:"
+                                    + epr, muex);
+                        }
+                    }
+                    // Give the message to the courier
+                    Courier courier = CourierFactory.getCourier(epr);
+                                            try
+                                            {
+                                                courier.deliver(message);
+                                            }
+                                            finally
+                                            {
+                                                CourierUtil.cleanCourier(courier) ;
+                                            }
+                    isSent = true;
+                    break;
+                } catch (MalformedEPRException ex) {
+                    logger.log(Priority.ERROR, "Malformed epr:" + epr);
+                    lastException = ex;
+                } catch (CourierException ce) {
+                    logger.log(Priority.ERROR, "Could not send using epr:"
+                            + epr, ce);
+                     lastException = ce;
+                    // if there are more eprs in the collection is will try
+                    // the next one.
+                }
+            }
+            if (isSent == false)
+            {
+                logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.", lastException);
+                throw new MessageRouterException("Message Could not be delivered. ", lastException);
+            }
+        } catch (RegistryException re) {
+            logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. "
+                            + re.getLocalizedMessage(), re);
+            throw new MessageRouterException("Message Could not be delivered due to issues with the Registry. "
+                    + re.getMessage(), re);
+        }
+    }
 	/**
 	 * Sends the message on to the service with the name(s) we just obtained
 	 * from the routing.
@@ -99,77 +182,9 @@
                     logger.debug(AGGEGRATOR_TAG+ "=" + tag);
                 }
             }
-         
             String[] destination = (String[]) i.next();
-			String category = destination[0];
-			String serviceName = destination[1];
-			boolean isSent = false;
-            Exception lastException = null;
-			try
-			{
-				Registry registry = RegistryFactory.getRegistry();
-				logger.log(Priority.DEBUG, "Looking for EPRs for category="
-						+ category + " and serviceName=" + serviceName);
-				Collection<EPR> eprs = registry.findEPRs(category, serviceName);
-				for (Iterator<EPR> eprIter = eprs.iterator(); eprIter.hasNext();)
-				{
-					EPR epr = eprs.iterator().next();
-					logger.log(Priority.DEBUG, "Message=" + message
-							+ " -> Destination=" + destinations);
-					try
-					{
-                        //TODO I think the following if block should be part of
-                        //the FileEpr construction?
-                        if (epr instanceof FileEpr) {
-                            try {
-                                FileEpr fileEpr = (FileEpr) epr;
-                                FileEpr newEpr = new FileEpr(fileEpr.getURL());
-                                newEpr.setPostDelete(false);
-                                newEpr.setPostDirectory(fileEpr.getURL().getFile());
-                                newEpr.setPostSuffix(fileEpr.getInputSuffix());
-                                epr = newEpr;
-                            } catch (URISyntaxException usex) {
-                                logger.log(Priority.ERROR, "Malformed epr while setting :"
-                                        + epr, usex);
-                            } catch (MalformedURLException muex) {
-                                logger.log(Priority.ERROR, "Malformed epr:"
-                                        + epr, muex);
-                            }
-                        }
-						// Give the message to the courier
-						Courier courier = CourierFactory.getCourier(epr);
-                                                try
-                                                {
-                                                    courier.deliver(message);
-                                                }
-                                                finally
-                                                {
-                                                    CourierUtil.cleanCourier(courier) ;
-                                                }
-						isSent = true;
-						break;
-					} catch (MalformedEPRException ex) {
-                        logger.log(Priority.ERROR, "Malformed epr:" + epr);
-                        lastException = ex;
-                    } catch (CourierException ce) {
-                        logger.log(Priority.ERROR, "Could not send using epr:"
-                                + epr, ce);
-                         lastException = ce;
-                        // if there are more eprs in the collection is will try
-                        // the next one.
-                    }
-				}
-				if (isSent == false)
-				{
-					logger.log(Priority.ERROR, "Could not find any valid EPRs. Message is not routed.", lastException);
-					throw new MessageRouterException("Message Could not be delivered. ", lastException);
-				}
-            } catch (RegistryException re) {
-				logger.log(Priority.ERROR, "Could not obtain an EPR from the Registry. Message is not routed. "
-								+ re.getLocalizedMessage(), re);
-                throw new MessageRouterException("Message Could not be delivered due to issues with the Registry. "
-                        + re.getMessage(), re);
-			}
+            deliverMessage(destination[0], destination[1], message);
+            
 		}
 	}
 

Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml	2007-04-27 16:29:21 UTC (rev 11419)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/META-INF/jboss-esb.xml	2007-04-27 16:53:09 UTC (rev 11420)
@@ -4,4 +4,44 @@
      		jbossesb internal services 
      -->
      
+     <providers>
+          <jms-provider name="Default-JMS-Provider" connection-factory="ConnectionFactory">          
+              <jms-bus busid="DeadMessageQueue">
+                  <jms-message-filter
+                      dest-type="QUEUE"
+                      dest-name="queue/DeadMessageQueue"
+                  />
+              </jms-bus>
+          </jms-provider>
+      </providers>
+      
+     <services>   
+        <service category="JBossESB-Internal" 
+                 name="DeadLetterService" 
+                 description="Dead Messages can be send to this service, which is configured to store and/or
+                 notify" >
+            <listeners>
+                <jms-listener name="JMS-DLQListener"
+                              busidref="DeadMessageQueue"
+                              maxThreads="1"
+                />                
+            </listeners>
+            <actions>
+               <action name="PersistAction" 
+               		class="org.jboss.soa.esb.actions.MessagePersister" >
+               		<property name="classfication" value="DLQ"/>
+               		<property name="message-store-type" value="urn:jboss/esb/persistence/type/DATABASE"/>
+               </action>
+               <action name="notificationAction" 
+               		class="org.jboss.soa.esb.actions.Notifier">
+               		<property name="okMethod" value="notifyOK" />
+               		<property name="notification-details">
+   	 				   <NotificationList> 
+      				     <target class="NotifyConsole" />
+	    		   	   </NotificationList> 
+	    		   </property>
+   	   			</action>    
+            </actions>
+        </service>
+      </services>
 </jbossesb>

Added: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml	                        (rev 0)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml	2007-04-27 16:53:09 UTC (rev 11420)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<server>
+  <mbean code="org.jboss.jms.server.destination.QueueService"
+    name="jboss.messaging.destination:service=Queue,name=DeadMessageQueue"
+    xmbean-dd="xmdesc/Queue-xmbean.xml">
+    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+  </mbean>
+</server>


Property changes on: labs/jbossesb/trunk/product/services/jbossesb/src/main/resources/jbm-queue-service.xml
___________________________________________________________________
Name: svn:mime-type
   + text/xml
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list