[jboss-svn-commits] JBL Code SVN: r6718 - in labs/jbossesb/trunk/product/core/listeners: src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb/actions tests/src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb/util

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Oct 10 08:57:13 EDT 2006


Author: tfennelly
Date: 2006-10-10 08:57:04 -0400 (Tue, 10 Oct 2006)
New Revision: 6718

Added:
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
Log:
Added actions config override as requested by feature request http://jira.jboss.com/jira/browse/JBESB-176

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -25,9 +25,9 @@
 import java.util.Arrays;
 
 import org.apache.log4j.Logger;
-import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.actions.ActionDefinition;
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessingException;
 import org.jboss.soa.esb.actions.ActionProcessor;
 import org.jboss.soa.esb.actions.ActionUtils;
 import org.jboss.soa.esb.helpers.DomElement;
@@ -40,7 +40,14 @@
  * @since Version 4.0
  */
 public abstract class AbstractListener implements Runnable {   
-    
+
+    /**
+	 * Name constant def for the Message attachemnt carrying the list of actions to be applied to the
+	 * incomming message.  This allows the configured processing pipeline to be overridden by the Message
+	 * producer. 
+	 */
+	public static final String MESSAGE_PROCESSING_ACTIONS_LIST = "MESSAGE_PROCESSING_ACTIONS_LIST";
+	
     // You can override these values at constructor time of your
     // derived class after calling super(GpListener,DomElement)
     protected int m_iSleepForThreads = 3000; // default sleep if no threads available
@@ -66,12 +73,10 @@
         m_oThrGrp 	= new ThreadGroup(listenerConfig.getName());
 
         String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
-        m_oActions 	= sAtt.split(",");
+        if(!sAtt.trim().equals("")) {
+        	m_oActions 	= sAtt.split(",");
+        }
         
-        if(m_oActions.length == 0) {
-            throw new ConfigurationException("Listener 'actions' list must be specified.");
-        }
-
         sAtt		= GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
         int iMax 	= Integer.parseInt(sAtt);
         m_iMaxThr 	= Math.min(iMax, m_iUpperThreadLimit);
@@ -86,10 +91,10 @@
     public void run() {
         while (m_oDad.continueLooping()) {
             Object[] processList = receive();
-            if (null==processList)
+            if (null==processList) {
             	try { Thread.sleep(500); }
             	catch(InterruptedException e) {/*  ok  do nothing  */}
-            else
+            } else {
               for (Object currentObj : processList) {
                 if (m_iQthr >= m_iMaxThr) {
                     logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
@@ -104,9 +109,23 @@
                 // Spawn a thread and push the message message through the pipeline...
                 ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
                 new Thread(runner).start();
+                incThreads();
+              }
             }
         }
         
+        // Wait for all the processing pipelines to complete before closing the listener and existing...
+        while(m_iQthr > 0) {
+        	logger.info("Waiting for all processing pipelines to complete.");
+        	try {
+				Thread.sleep(200);
+			} catch (InterruptedException e) {
+	        	logger.warn("Thread interrupted while waiting for all processing pipelines to complete.", e);
+			}
+        }
+
+    	logger.info("All processing pipelines complete. Closing listener now.");
+        
         close();
     }
     
@@ -121,8 +140,9 @@
     /**
      * Called on the listener implementation when pipeline processing error has occured.
      * @param initialMsg The message that was initialy supplied to the pipeline.
-     * @param processor The processor raised the error.
-     * @param error The error.
+     * @param processor The processor that raised the error.  Can be null where the error was raised before
+     * pipeline processing of the message.
+     * @param error The error.  Can be null.
      */
     protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
 
@@ -164,7 +184,7 @@
      */
     private class ActionProcessingPipeline implements Runnable {
         
-        private Object initialObject;
+		private Object initialObject;
              
         /**
          * Private constructor.
@@ -181,15 +201,21 @@
             String currentAction = null;
             ActionProcessor currentProcessor = null;
             
-            // Increment the active thread count for the listener on starting...
-            incThreads();
-            
             try {
-                Message message = m_oMsgFactory.getMessage();
-                ActionUtils.putCurrentObject(message,initialObject);
-
+                Message message;
+                String[] actions;
+                
+                if(initialObject instanceof Message) {
+                	message = (Message)initialObject;
+                } else {
+	                message = m_oMsgFactory.getMessage();
+	                ActionUtils.putCurrentObject(message,initialObject);
+                }
+                
+                actions = getActions(message);
+                
                 // Run the message through each ActionProcessor...
-                for(String action : m_oActions) {
+                for(String action : actions) {
                     ActionDefinition actionDefinition;
 
                     currentAction = action.trim();
@@ -209,25 +235,59 @@
                     }
                     
                     if(message == null && action != m_oActions[m_oActions.length - 1]) {
-                        logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
-                        break;
+                    	String exceptionMessage = "Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].";
+                        processingError(initialObject, currentProcessor, new ActionProcessingException(exceptionMessage));
+                        logger.warn(exceptionMessage);
+                        return;
                     }
                     // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
                     GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
                     
                     // Setup the message for processing by the next processor...
-                    message.getBody().remove(ActionUtils.BEFORE_ACTION);
+                    if(message != null) {
+                    	message.getBody().remove(ActionUtils.BEFORE_ACTION);
+                    }
                 }
+                
+                processingComplete(initialObject);
             } catch(Throwable thrown) {
                 processingError(initialObject, currentProcessor, thrown);
-                logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] threw an exception.", thrown);
+                logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "].  Action [" + currentAction + "] threw an exception.", thrown);
+            } finally {
+	            // Decrement the active thread count for the listener on completion...
+	            decThreads();
             }
-            
-            processingComplete(initialObject);
-            
-            // Decrement the active thread count for the listener on completion...
-            decThreads();
         }
+
+		/**
+		 * Get the list of actions to be applied to the supplied message.
+		 * @param message The message to be processed.
+		 * @return The set of processing actions to be performed on the message. 
+		 * @throws ActionProcessingException Invalid actions list attachment setting.
+		 */
+		private String[] getActions(Message message) throws ActionProcessingException {
+			// Check is there an attachment specifying an override pipeline config...
+			Object overrideActionsAttachment = message.getAttachment().get(MESSAGE_PROCESSING_ACTIONS_LIST);
+			if(overrideActionsAttachment != null) {
+				if(overrideActionsAttachment instanceof String) {
+					String overrideActions = (String)overrideActionsAttachment;
+					
+					if(overrideActions.trim().equals("")) {
+			        	throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] was specified but with an empty value.  Aborting message processing.");
+					}
+					
+					return overrideActions.split(",");
+				} else {
+					throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] must be of type java.lang.String.  Received [" + overrideActionsAttachment.getClass().getName() + "].  Aborting message processing.");
+				}
+			} else {
+				// Otherwise use the actions configured on the listener...
+				if(m_oActions == null || m_oActions.length == 0) {
+					throw new ActionProcessingException("No actions configuration specified either on the listener or as a Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "].  Aborting message processing.");
+				}
+				return m_oActions;
+			}
+		}
     }
     
 } // ____________________________________________________________________________

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -162,6 +162,12 @@
 	private static CommandQueue defaultCommandQueue = null;
 
 	/**
+	 * Package pivate default constructor. 
+	 */
+	protected GpListener() {		
+	}
+	
+	/**
 	 * Construct a Listener Manager from the named repository based
 	 * configuration.
 	 * 

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -23,6 +23,9 @@
 package org.jboss.soa.esb.actions;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.Vector;
+
 import org.jboss.soa.esb.message.Message;
 
 /**
@@ -32,10 +35,26 @@
  */
 public abstract class BaseTestActionProcessor implements ActionProcessor {
 
+	public ActionProcessingException exception;
+	public List<Message> processedMessages = new Vector<Message>();
+	public boolean returnNull = false;
+	
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.actions.ActionProcessor#process(java.lang.Object)
      */
-    public abstract Message process(Message message) throws ActionProcessingException;
+    public Message process(Message message) throws ActionProcessingException {
+    	if(exception != null) {
+    		throw exception;
+    	}
+    	
+    	processedMessages.add(message);
+    	
+    	if(returnNull) {
+    		return null;
+    	}
+    	
+    	return message;
+    }
 
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.actions.ActionProcessor#getOkNotification(java.lang.Object)

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -24,6 +24,7 @@
 
 import java.util.List;
 
+import org.apache.log4j.Logger;
 import org.jboss.soa.esb.helpers.KeyValuePair;
 import org.jboss.soa.esb.message.Message;
 
@@ -35,12 +36,11 @@
 
 public class TestActionProcessor1 extends BaseTestActionProcessor {
     
+	private static Logger logger = Logger.getLogger(TestActionProcessor1.class);
     public String name;
     public List<KeyValuePair> properties;
 
 	public TestActionProcessor1(String name, List<KeyValuePair> properties) {
-		System.out.println("Instantiate action handler: " + name);
-        
         this.name = name;
         this.properties = properties;
 	}
@@ -48,8 +48,8 @@
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.actions.ActionProcessor#processAction(java.lang.Object)
      */
-    public Message process(Message message) {
-        System.out.println("processAction: " + ActionUtils.currentFromMessage(message));
-        return message;
+    public Message process(Message message) throws ActionProcessingException {
+    	logger.info("Processing action [" + name + "]: " + ActionUtils.currentFromMessage(message));
+        return super.process(message);
     }
 }

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -21,6 +21,8 @@
  */
 
 package org.jboss.soa.esb.actions;
+
+import org.apache.log4j.Logger;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.actions.ActionUtils;
 
@@ -31,12 +33,14 @@
  */
 
 public class TestActionProcessor2 extends BaseTestActionProcessor {
-    
+
+	private static Logger logger = Logger.getLogger(TestActionProcessor2.class);
+	
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.actions.ActionProcessor#processAction(java.lang.Object)
      */
-    public Message process(Message message) {
-        System.out.println("processAction: " + ActionUtils.currentFromMessage(message));
-        return message;
+    public Message process(Message message) throws ActionProcessingException {
+    	logger.info("Processing action: " + ActionUtils.currentFromMessage(message));
+        return super.process(message);
     }
 }

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -39,8 +39,8 @@
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.actions.ActionProcessor#processAction(java.lang.Object)
      */
-    public Message process(Message message) {
+    public Message process(Message message) throws ActionProcessingException {
         System.out.println("processAction: " + ActionUtils.currentFromMessage(message));
-        return message;
+        return super.process(message);
     }
 }

Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.actions.BaseTestActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.util.MockNonblockingListener;
+
+import junit.framework.TestCase;
+
+/**
+ * AbstractListener tests.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class AbstractListenerTest extends TestCase {
+
+	private MockGpListener gpListener;
+	private ActionDefinitionFactory factory;
+	private DomElement listenerConfig;
+	private ActionDefinition action_ActionA;
+	private ActionDefinition action_ActionB;
+	private ActionDefinition action_ActionC;
+	
+	/* (non-Javadoc)
+	 * @see junit.framework.TestCase#setUp()
+	 */
+	@Override
+	protected void setUp() throws Exception {
+		gpListener = new MockGpListener();
+		listenerConfig = new DomElement("listenerConfig");
+
+        DomElement config = DomElement.fromInputStream(getClass().getResourceAsStream("AbstractListener_ActionConfig.xml"));
+        factory = new ActionDefinitionFactory(config);
+        action_ActionA = factory.getInstance("ActionA");
+        action_ActionB = factory.getInstance("ActionB");
+        action_ActionC = factory.getInstance("ActionC");
+        assertNotNull(action_ActionA);
+        assertNotNull(action_ActionB);
+        assertNotNull(action_ActionC);
+	}
+
+	public void test_BadActionsConfig() throws Exception {
+		// Not action config of any description...
+		assertActionsConfigException("message1", "No actions configuration specified either on the listener or as a Message attachement [MESSAGE_PROCESSING_ACTIONS_LIST].  Aborting message processing.");
+		
+		// Bad actions config as an override attachment - wrong type...
+		Message message = MessageFactory.getInstance().getMessage();
+		message.getAttachment().put(AbstractListener.MESSAGE_PROCESSING_ACTIONS_LIST, new Long(1));
+		assertActionsConfigException(message, "Message attachement [MESSAGE_PROCESSING_ACTIONS_LIST] must be of type java.lang.String.  Received [java.lang.Long].  Aborting message processing.");
+		
+		// Bad actions config as an override attachment - empty...
+		message = MessageFactory.getInstance().getMessage();
+		message.getAttachment().put(AbstractListener.MESSAGE_PROCESSING_ACTIONS_LIST, " ");
+		assertActionsConfigException(message, "Message attachement [MESSAGE_PROCESSING_ACTIONS_LIST] was specified but with an empty value.  Aborting message processing.");
+		
+		// Bad actions config as an override attachment - unknown action...
+		listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionX");
+		assertActionsConfigException("message1", "Bad Listener Configuration.  No 'Actions/Action' definition for action [ActionX].");
+	}
+
+	public void test_ActionsListenerConfig() throws Exception {
+		listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB");
+		
+		// Run the listener and check that the proper actions were run...
+		runListener("message1", null);
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+		assertEquals(0, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+	}
+
+	public void test_ActionsOverrideConfig() throws Exception {
+		Message message = MessageFactory.getInstance().getMessage();
+
+		// Set the actions on both the listener config and on the message as an attachment...
+		listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB");
+		message.getAttachment().put(AbstractListener.MESSAGE_PROCESSING_ACTIONS_LIST, "ActionA, ActionC");
+		ActionUtils.putCurrentObject(message, "message1");
+		
+		// Run the listener and check that it was the attachment actions config that was used...
+		runListener(message, null);
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+		assertEquals(0, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+	}
+
+	public void test_PrematureTermination_By_Exception() throws Exception {
+		listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB, ActionC");
+		
+		((BaseTestActionProcessor)action_ActionB.getProcessor()).exception = new ActionProcessingException("Premature termination by Exception!");
+		
+		// Run the listener and check that it failed and raised an appropriate error on the listener...
+		runListener("message1", "Premature termination by Exception!");
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+		assertEquals(0, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+		assertEquals(0, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+	}
+
+	public void test_PrematureTermination_By_Null() throws Exception {
+		listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB, ActionC");
+		
+		((BaseTestActionProcessor)action_ActionB.getProcessor()).returnNull = true;
+		
+		// Run the listener and check that it failed and raised an appropriate error on the listener...
+		runListener("message1", "Premature termination of action processing pipeline [[ActionA,  ActionB,  ActionC]].  ActionProcessor [org.jboss.soa.esb.actions.TestActionProcessor1] returned a null message result on processing of action [ActionB].");
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+		assertEquals(0, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+	}
+
+	public void test_Last_Action_Returning_Null_OK() throws Exception {
+		listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB, ActionC");
+		
+		((BaseTestActionProcessor)action_ActionC.getProcessor()).returnNull = true;
+		
+		runListener("message1", null);
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+		assertEquals(1, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+	}
+
+	private void assertActionsConfigException(Object message, String expectedException) throws Exception {
+		MockNonblockingListener listener = new MockNonblockingListener(gpListener, listenerConfig, factory);
+		
+		listener.messages = new Object[] {message};
+		gpListener.continueLooping = true;
+		listener.run();
+		
+		assertEquals(0, listener.messagesCompleted.size());
+		assertEquals(1, listener.messagesInError.size());
+		assertEquals(message, listener.messagesInError.get(0).initialMsg);
+		assertEquals(null, listener.messagesInError.get(0).processor);
+		assertEquals(expectedException, listener.messagesInError.get(0).error.getMessage());
+	}
+
+	private MockNonblockingListener runListener(Object message, String expectedException) throws Exception {
+		MockNonblockingListener listener = new MockNonblockingListener(gpListener, listenerConfig, factory);
+		
+		listener.messages = new Object[] {message};
+		gpListener.continueLooping = true;
+		listener.run();
+
+		if(expectedException == null) {
+			assertEquals(1, listener.messagesCompleted.size());
+			assertEquals(0, listener.messagesInError.size());
+			assertEquals(message, listener.messagesCompleted.get(0));
+		} else {
+			assertEquals(0, listener.messagesCompleted.size());
+			assertEquals(1, listener.messagesInError.size());
+			assertEquals(message, listener.messagesInError.get(0).initialMsg);
+			assertEquals(expectedException, listener.messagesInError.get(0).error.getMessage());
+		}
+		
+		return listener;
+	}
+	
+	/**
+	 * Overriding the GpListener to get control over the continueLooping method.
+	 * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+	 */
+	private class MockGpListener extends GpListener {
+		private boolean continueLooping = true;
+		
+		public boolean continueLooping() {
+			try {
+				return continueLooping;
+			} finally {
+				continueLooping = false;
+			}
+		}		
+	}
+}

Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml	2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,11 @@
+	<Actions>
+		<Action name="ActionA" processor="Processor1" />
+		<Action name="ActionB" processor="Processor2" />
+		<Action name="ActionC" processor="Processor3" />
+
+		<ProcessorAliases>
+			<Alias name="Processor1" class="org.jboss.soa.esb.actions.TestActionProcessor1" />
+			<Alias name="Processor2" class="org.jboss.soa.esb.actions.TestActionProcessor1" />
+			<Alias name="Processor3" class="org.jboss.soa.esb.actions.TestActionProcessor1" />
+		</ProcessorAliases>
+	</Actions>

Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.util;
+
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.AbstractListener;
+import org.jboss.soa.esb.listeners.GpListener;
+
+/**
+ * Abstract Mock Listener implementation.
+ * <p/>
+ * Leaves you to implement the receive method in whatever way suits.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public abstract class AbstractMockListner extends AbstractListener {
+
+	private static Logger logger = Logger.getLogger(AbstractMockListner.class);
+	public List<MessageInError> messagesInError = new Vector<MessageInError>();
+	public List<Object> messagesCompleted = new Vector<Object>();
+	
+	public AbstractMockListner(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+		super(p_oDad, p_oParms, actionDefinitionFactory);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+	 */
+	@Override
+	protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+		logger.info("Received 'processingError' event. Message [" + initialMsg + "], ActionProcessor [" + processor + "], Throwable [" + error + "]", error);
+		messagesInError.add(new MessageInError(initialMsg, processor, error));
+	}
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+	 */
+	@Override
+	protected void processingComplete(Object initialMsg) {
+		logger.info("Received 'processingComplete' event. Message [" + initialMsg + "]");
+		messagesCompleted.add(initialMsg);
+	}
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+     */
+    @Override
+    protected void close() {
+    }
+
+	public class MessageInError {
+		public Object initialMsg; 
+		public ActionProcessor processor; 
+		public Throwable error;
+		public MessageInError(Object initialMsg, ActionProcessor processor, Throwable error) {
+			this.initialMsg = initialMsg;
+			this.processor = processor;
+			this.error = error;
+		}
+	}
+}

Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated 
+ * by the @authors tag. All rights reserved. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.util;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+
+/**
+ * Non blocking mock Listener implementation.
+ * <p/>
+ * "Real" listener implementations need to perform blocking receives.  This can make testing
+ * a little awkward, especially where the listener is just one of thee test fixtures.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockNonblockingListener extends AbstractMockListner {
+
+	public Object[] messages;
+	
+	public MockNonblockingListener(GpListener p_oDad, DomElement p_oParms,
+			ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+		super(p_oDad, p_oParms, actionDefinitionFactory);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.AbstractListener#receive()
+	 */
+	@Override
+	protected Object[] receive() {
+		return messages;
+	}
+
+}

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java	2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java	2006-10-10 12:57:04 UTC (rev 6718)
@@ -3,6 +3,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Queue;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
@@ -20,6 +21,8 @@
 public class MockPoller extends AbstractPoller {
 
 	private static Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
+	public List<MessageInError> messagesInError = new Vector<MessageInError>();
+	public List<Object> messagesCompleted = new Vector<Object>();
 	
 	public MockPoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
 		super(p_oDad, p_oParms, actionDefinitionFactory);
@@ -75,17 +78,30 @@
     protected void close() {
     }
 
-    /* (non-Javadoc)
-     * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
-     */
-    @Override
-    protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
-    }
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+	 */
+	@Override
+	protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+		messagesInError.add(new MessageInError(initialMsg, processor, error));
+	}
 
-    /* (non-Javadoc)
-     * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
-     */
-    @Override
-    protected void processingComplete(Object initialMessage) {
-    }
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+	 */
+	@Override
+	protected void processingComplete(Object initialMsg) {
+		messagesCompleted.add(initialMsg);
+	}
+
+	public class MessageInError {
+		public Object initialMsg; 
+		public ActionProcessor processor; 
+		public Throwable error;
+		public MessageInError(Object initialMsg, ActionProcessor processor, Throwable error) {
+			this.initialMsg = initialMsg;
+			this.processor = processor;
+			this.error = error;
+		}
+	}
 }




More information about the jboss-svn-commits mailing list