[jboss-svn-commits] JBL Code SVN: r6718 - in labs/jbossesb/trunk/product/core/listeners: src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb/actions tests/src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb/util
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Oct 10 08:57:13 EDT 2006
Author: tfennelly
Date: 2006-10-10 08:57:04 -0400 (Tue, 10 Oct 2006)
New Revision: 6718
Added:
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
Log:
Added actions config override as requested by feature request http://jira.jboss.com/jira/browse/JBESB-176
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -25,9 +25,9 @@
import java.util.Arrays;
import org.apache.log4j.Logger;
-import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.ActionDefinition;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessingException;
import org.jboss.soa.esb.actions.ActionProcessor;
import org.jboss.soa.esb.actions.ActionUtils;
import org.jboss.soa.esb.helpers.DomElement;
@@ -40,7 +40,14 @@
* @since Version 4.0
*/
public abstract class AbstractListener implements Runnable {
-
+
+ /**
+ * Name constant def for the Message attachemnt carrying the list of actions to be applied to the
+ * incomming message. This allows the configured processing pipeline to be overridden by the Message
+ * producer.
+ */
+ public static final String MESSAGE_PROCESSING_ACTIONS_LIST = "MESSAGE_PROCESSING_ACTIONS_LIST";
+
// You can override these values at constructor time of your
// derived class after calling super(GpListener,DomElement)
protected int m_iSleepForThreads = 3000; // default sleep if no threads available
@@ -66,12 +73,10 @@
m_oThrGrp = new ThreadGroup(listenerConfig.getName());
String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
- m_oActions = sAtt.split(",");
+ if(!sAtt.trim().equals("")) {
+ m_oActions = sAtt.split(",");
+ }
- if(m_oActions.length == 0) {
- throw new ConfigurationException("Listener 'actions' list must be specified.");
- }
-
sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
int iMax = Integer.parseInt(sAtt);
m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
@@ -86,10 +91,10 @@
public void run() {
while (m_oDad.continueLooping()) {
Object[] processList = receive();
- if (null==processList)
+ if (null==processList) {
try { Thread.sleep(500); }
catch(InterruptedException e) {/* ok do nothing */}
- else
+ } else {
for (Object currentObj : processList) {
if (m_iQthr >= m_iMaxThr) {
logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
@@ -104,9 +109,23 @@
// Spawn a thread and push the message message through the pipeline...
ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
new Thread(runner).start();
+ incThreads();
+ }
}
}
+ // Wait for all the processing pipelines to complete before closing the listener and existing...
+ while(m_iQthr > 0) {
+ logger.info("Waiting for all processing pipelines to complete.");
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.warn("Thread interrupted while waiting for all processing pipelines to complete.", e);
+ }
+ }
+
+ logger.info("All processing pipelines complete. Closing listener now.");
+
close();
}
@@ -121,8 +140,9 @@
/**
* Called on the listener implementation when pipeline processing error has occured.
* @param initialMsg The message that was initialy supplied to the pipeline.
- * @param processor The processor raised the error.
- * @param error The error.
+ * @param processor The processor that raised the error. Can be null where the error was raised before
+ * pipeline processing of the message.
+ * @param error The error. Can be null.
*/
protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
@@ -164,7 +184,7 @@
*/
private class ActionProcessingPipeline implements Runnable {
- private Object initialObject;
+ private Object initialObject;
/**
* Private constructor.
@@ -181,15 +201,21 @@
String currentAction = null;
ActionProcessor currentProcessor = null;
- // Increment the active thread count for the listener on starting...
- incThreads();
-
try {
- Message message = m_oMsgFactory.getMessage();
- ActionUtils.putCurrentObject(message,initialObject);
-
+ Message message;
+ String[] actions;
+
+ if(initialObject instanceof Message) {
+ message = (Message)initialObject;
+ } else {
+ message = m_oMsgFactory.getMessage();
+ ActionUtils.putCurrentObject(message,initialObject);
+ }
+
+ actions = getActions(message);
+
// Run the message through each ActionProcessor...
- for(String action : m_oActions) {
+ for(String action : actions) {
ActionDefinition actionDefinition;
currentAction = action.trim();
@@ -209,25 +235,59 @@
}
if(message == null && action != m_oActions[m_oActions.length - 1]) {
- logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
- break;
+ String exceptionMessage = "Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].";
+ processingError(initialObject, currentProcessor, new ActionProcessingException(exceptionMessage));
+ logger.warn(exceptionMessage);
+ return;
}
// Notify on all processors. May want to do this differently in the future i.e. more selectively ...
GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
// Setup the message for processing by the next processor...
- message.getBody().remove(ActionUtils.BEFORE_ACTION);
+ if(message != null) {
+ message.getBody().remove(ActionUtils.BEFORE_ACTION);
+ }
}
+
+ processingComplete(initialObject);
} catch(Throwable thrown) {
processingError(initialObject, currentProcessor, thrown);
- logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] threw an exception.", thrown);
+ logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "]. Action [" + currentAction + "] threw an exception.", thrown);
+ } finally {
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
}
-
- processingComplete(initialObject);
-
- // Decrement the active thread count for the listener on completion...
- decThreads();
}
+
+ /**
+ * Get the list of actions to be applied to the supplied message.
+ * @param message The message to be processed.
+ * @return The set of processing actions to be performed on the message.
+ * @throws ActionProcessingException Invalid actions list attachment setting.
+ */
+ private String[] getActions(Message message) throws ActionProcessingException {
+ // Check is there an attachment specifying an override pipeline config...
+ Object overrideActionsAttachment = message.getAttachment().get(MESSAGE_PROCESSING_ACTIONS_LIST);
+ if(overrideActionsAttachment != null) {
+ if(overrideActionsAttachment instanceof String) {
+ String overrideActions = (String)overrideActionsAttachment;
+
+ if(overrideActions.trim().equals("")) {
+ throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] was specified but with an empty value. Aborting message processing.");
+ }
+
+ return overrideActions.split(",");
+ } else {
+ throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] must be of type java.lang.String. Received [" + overrideActionsAttachment.getClass().getName() + "]. Aborting message processing.");
+ }
+ } else {
+ // Otherwise use the actions configured on the listener...
+ if(m_oActions == null || m_oActions.length == 0) {
+ throw new ActionProcessingException("No actions configuration specified either on the listener or as a Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "]. Aborting message processing.");
+ }
+ return m_oActions;
+ }
+ }
}
} // ____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -162,6 +162,12 @@
private static CommandQueue defaultCommandQueue = null;
/**
+ * Package pivate default constructor.
+ */
+ protected GpListener() {
+ }
+
+ /**
* Construct a Listener Manager from the named repository based
* configuration.
*
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/BaseTestActionProcessor.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -23,6 +23,9 @@
package org.jboss.soa.esb.actions;
import java.io.Serializable;
+import java.util.List;
+import java.util.Vector;
+
import org.jboss.soa.esb.message.Message;
/**
@@ -32,10 +35,26 @@
*/
public abstract class BaseTestActionProcessor implements ActionProcessor {
+ public ActionProcessingException exception;
+ public List<Message> processedMessages = new Vector<Message>();
+ public boolean returnNull = false;
+
/* (non-Javadoc)
* @see org.jboss.soa.esb.actions.ActionProcessor#process(java.lang.Object)
*/
- public abstract Message process(Message message) throws ActionProcessingException;
+ public Message process(Message message) throws ActionProcessingException {
+ if(exception != null) {
+ throw exception;
+ }
+
+ processedMessages.add(message);
+
+ if(returnNull) {
+ return null;
+ }
+
+ return message;
+ }
/* (non-Javadoc)
* @see org.jboss.soa.esb.actions.ActionProcessor#getOkNotification(java.lang.Object)
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor1.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -24,6 +24,7 @@
import java.util.List;
+import org.apache.log4j.Logger;
import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.message.Message;
@@ -35,12 +36,11 @@
public class TestActionProcessor1 extends BaseTestActionProcessor {
+ private static Logger logger = Logger.getLogger(TestActionProcessor1.class);
public String name;
public List<KeyValuePair> properties;
public TestActionProcessor1(String name, List<KeyValuePair> properties) {
- System.out.println("Instantiate action handler: " + name);
-
this.name = name;
this.properties = properties;
}
@@ -48,8 +48,8 @@
/* (non-Javadoc)
* @see org.jboss.soa.esb.actions.ActionProcessor#processAction(java.lang.Object)
*/
- public Message process(Message message) {
- System.out.println("processAction: " + ActionUtils.currentFromMessage(message));
- return message;
+ public Message process(Message message) throws ActionProcessingException {
+ logger.info("Processing action [" + name + "]: " + ActionUtils.currentFromMessage(message));
+ return super.process(message);
}
}
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor2.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -21,6 +21,8 @@
*/
package org.jboss.soa.esb.actions;
+
+import org.apache.log4j.Logger;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.actions.ActionUtils;
@@ -31,12 +33,14 @@
*/
public class TestActionProcessor2 extends BaseTestActionProcessor {
-
+
+ private static Logger logger = Logger.getLogger(TestActionProcessor2.class);
+
/* (non-Javadoc)
* @see org.jboss.soa.esb.actions.ActionProcessor#processAction(java.lang.Object)
*/
- public Message process(Message message) {
- System.out.println("processAction: " + ActionUtils.currentFromMessage(message));
- return message;
+ public Message process(Message message) throws ActionProcessingException {
+ logger.info("Processing action: " + ActionUtils.currentFromMessage(message));
+ return super.process(message);
}
}
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TestActionProcessor3.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -39,8 +39,8 @@
/* (non-Javadoc)
* @see org.jboss.soa.esb.actions.ActionProcessor#processAction(java.lang.Object)
*/
- public Message process(Message message) {
+ public Message process(Message message) throws ActionProcessingException {
System.out.println("processAction: " + ActionUtils.currentFromMessage(message));
- return message;
+ return super.process(message);
}
}
Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListenerTest.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.actions.BaseTestActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.jboss.soa.esb.util.MockNonblockingListener;
+
+import junit.framework.TestCase;
+
+/**
+ * AbstractListener tests.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class AbstractListenerTest extends TestCase {
+
+ private MockGpListener gpListener;
+ private ActionDefinitionFactory factory;
+ private DomElement listenerConfig;
+ private ActionDefinition action_ActionA;
+ private ActionDefinition action_ActionB;
+ private ActionDefinition action_ActionC;
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ gpListener = new MockGpListener();
+ listenerConfig = new DomElement("listenerConfig");
+
+ DomElement config = DomElement.fromInputStream(getClass().getResourceAsStream("AbstractListener_ActionConfig.xml"));
+ factory = new ActionDefinitionFactory(config);
+ action_ActionA = factory.getInstance("ActionA");
+ action_ActionB = factory.getInstance("ActionB");
+ action_ActionC = factory.getInstance("ActionC");
+ assertNotNull(action_ActionA);
+ assertNotNull(action_ActionB);
+ assertNotNull(action_ActionC);
+ }
+
+ public void test_BadActionsConfig() throws Exception {
+ // Not action config of any description...
+ assertActionsConfigException("message1", "No actions configuration specified either on the listener or as a Message attachement [MESSAGE_PROCESSING_ACTIONS_LIST]. Aborting message processing.");
+
+ // Bad actions config as an override attachment - wrong type...
+ Message message = MessageFactory.getInstance().getMessage();
+ message.getAttachment().put(AbstractListener.MESSAGE_PROCESSING_ACTIONS_LIST, new Long(1));
+ assertActionsConfigException(message, "Message attachement [MESSAGE_PROCESSING_ACTIONS_LIST] must be of type java.lang.String. Received [java.lang.Long]. Aborting message processing.");
+
+ // Bad actions config as an override attachment - empty...
+ message = MessageFactory.getInstance().getMessage();
+ message.getAttachment().put(AbstractListener.MESSAGE_PROCESSING_ACTIONS_LIST, " ");
+ assertActionsConfigException(message, "Message attachement [MESSAGE_PROCESSING_ACTIONS_LIST] was specified but with an empty value. Aborting message processing.");
+
+ // Bad actions config as an override attachment - unknown action...
+ listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionX");
+ assertActionsConfigException("message1", "Bad Listener Configuration. No 'Actions/Action' definition for action [ActionX].");
+ }
+
+ public void test_ActionsListenerConfig() throws Exception {
+ listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB");
+
+ // Run the listener and check that the proper actions were run...
+ runListener("message1", null);
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+ assertEquals(0, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+ }
+
+ public void test_ActionsOverrideConfig() throws Exception {
+ Message message = MessageFactory.getInstance().getMessage();
+
+ // Set the actions on both the listener config and on the message as an attachment...
+ listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB");
+ message.getAttachment().put(AbstractListener.MESSAGE_PROCESSING_ACTIONS_LIST, "ActionA, ActionC");
+ ActionUtils.putCurrentObject(message, "message1");
+
+ // Run the listener and check that it was the attachment actions config that was used...
+ runListener(message, null);
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+ assertEquals(0, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+ }
+
+ public void test_PrematureTermination_By_Exception() throws Exception {
+ listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB, ActionC");
+
+ ((BaseTestActionProcessor)action_ActionB.getProcessor()).exception = new ActionProcessingException("Premature termination by Exception!");
+
+ // Run the listener and check that it failed and raised an appropriate error on the listener...
+ runListener("message1", "Premature termination by Exception!");
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+ assertEquals(0, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+ assertEquals(0, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+ }
+
+ public void test_PrematureTermination_By_Null() throws Exception {
+ listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB, ActionC");
+
+ ((BaseTestActionProcessor)action_ActionB.getProcessor()).returnNull = true;
+
+ // Run the listener and check that it failed and raised an appropriate error on the listener...
+ runListener("message1", "Premature termination of action processing pipeline [[ActionA, ActionB, ActionC]]. ActionProcessor [org.jboss.soa.esb.actions.TestActionProcessor1] returned a null message result on processing of action [ActionB].");
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+ assertEquals(0, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+ }
+
+ public void test_Last_Action_Returning_Null_OK() throws Exception {
+ listenerConfig.setAttr(GpListener.PARM_ACTIONS, "ActionA, ActionB, ActionC");
+
+ ((BaseTestActionProcessor)action_ActionC.getProcessor()).returnNull = true;
+
+ runListener("message1", null);
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionA.getProcessor()).processedMessages.size());
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionB.getProcessor()).processedMessages.size());
+ assertEquals(1, ((BaseTestActionProcessor)action_ActionC.getProcessor()).processedMessages.size());
+ }
+
+ private void assertActionsConfigException(Object message, String expectedException) throws Exception {
+ MockNonblockingListener listener = new MockNonblockingListener(gpListener, listenerConfig, factory);
+
+ listener.messages = new Object[] {message};
+ gpListener.continueLooping = true;
+ listener.run();
+
+ assertEquals(0, listener.messagesCompleted.size());
+ assertEquals(1, listener.messagesInError.size());
+ assertEquals(message, listener.messagesInError.get(0).initialMsg);
+ assertEquals(null, listener.messagesInError.get(0).processor);
+ assertEquals(expectedException, listener.messagesInError.get(0).error.getMessage());
+ }
+
+ private MockNonblockingListener runListener(Object message, String expectedException) throws Exception {
+ MockNonblockingListener listener = new MockNonblockingListener(gpListener, listenerConfig, factory);
+
+ listener.messages = new Object[] {message};
+ gpListener.continueLooping = true;
+ listener.run();
+
+ if(expectedException == null) {
+ assertEquals(1, listener.messagesCompleted.size());
+ assertEquals(0, listener.messagesInError.size());
+ assertEquals(message, listener.messagesCompleted.get(0));
+ } else {
+ assertEquals(0, listener.messagesCompleted.size());
+ assertEquals(1, listener.messagesInError.size());
+ assertEquals(message, listener.messagesInError.get(0).initialMsg);
+ assertEquals(expectedException, listener.messagesInError.get(0).error.getMessage());
+ }
+
+ return listener;
+ }
+
+ /**
+ * Overriding the GpListener to get control over the continueLooping method.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+ private class MockGpListener extends GpListener {
+ private boolean continueLooping = true;
+
+ public boolean continueLooping() {
+ try {
+ return continueLooping;
+ } finally {
+ continueLooping = false;
+ }
+ }
+ }
+}
Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/AbstractListener_ActionConfig.xml 2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,11 @@
+ <Actions>
+ <Action name="ActionA" processor="Processor1" />
+ <Action name="ActionB" processor="Processor2" />
+ <Action name="ActionC" processor="Processor3" />
+
+ <ProcessorAliases>
+ <Alias name="Processor1" class="org.jboss.soa.esb.actions.TestActionProcessor1" />
+ <Alias name="Processor2" class="org.jboss.soa.esb.actions.TestActionProcessor1" />
+ <Alias name="Processor3" class="org.jboss.soa.esb.actions.TestActionProcessor1" />
+ </ProcessorAliases>
+ </Actions>
Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/AbstractMockListner.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.util;
+
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.AbstractListener;
+import org.jboss.soa.esb.listeners.GpListener;
+
+/**
+ * Abstract Mock Listener implementation.
+ * <p/>
+ * Leaves you to implement the receive method in whatever way suits.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public abstract class AbstractMockListner extends AbstractListener {
+
+ private static Logger logger = Logger.getLogger(AbstractMockListner.class);
+ public List<MessageInError> messagesInError = new Vector<MessageInError>();
+ public List<Object> messagesCompleted = new Vector<Object>();
+
+ public AbstractMockListner(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(p_oDad, p_oParms, actionDefinitionFactory);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+ logger.info("Received 'processingError' event. Message [" + initialMsg + "], ActionProcessor [" + processor + "], Throwable [" + error + "]", error);
+ messagesInError.add(new MessageInError(initialMsg, processor, error));
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMsg) {
+ logger.info("Received 'processingComplete' event. Message [" + initialMsg + "]");
+ messagesCompleted.add(initialMsg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ }
+
+ public class MessageInError {
+ public Object initialMsg;
+ public ActionProcessor processor;
+ public Throwable error;
+ public MessageInError(Object initialMsg, ActionProcessor processor, Throwable error) {
+ this.initialMsg = initialMsg;
+ this.processor = processor;
+ this.error = error;
+ }
+ }
+}
Added: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNonblockingListener.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.soa.esb.util;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+
+/**
+ * Non blocking mock Listener implementation.
+ * <p/>
+ * "Real" listener implementations need to perform blocking receives. This can make testing
+ * a little awkward, especially where the listener is just one of thee test fixtures.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockNonblockingListener extends AbstractMockListner {
+
+ public Object[] messages;
+
+ public MockNonblockingListener(GpListener p_oDad, DomElement p_oParms,
+ ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(p_oDad, p_oParms, actionDefinitionFactory);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#receive()
+ */
+ @Override
+ protected Object[] receive() {
+ return messages;
+ }
+
+}
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java 2006-10-10 10:34:48 UTC (rev 6717)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java 2006-10-10 12:57:04 UTC (rev 6718)
@@ -3,6 +3,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
+import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
@@ -20,6 +21,8 @@
public class MockPoller extends AbstractPoller {
private static Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
+ public List<MessageInError> messagesInError = new Vector<MessageInError>();
+ public List<Object> messagesCompleted = new Vector<Object>();
public MockPoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
super(p_oDad, p_oParms, actionDefinitionFactory);
@@ -75,17 +78,30 @@
protected void close() {
}
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
- */
- @Override
- protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
- }
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
+ messagesInError.add(new MessageInError(initialMsg, processor, error));
+ }
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
- */
- @Override
- protected void processingComplete(Object initialMessage) {
- }
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMsg) {
+ messagesCompleted.add(initialMsg);
+ }
+
+ public class MessageInError {
+ public Object initialMsg;
+ public ActionProcessor processor;
+ public Throwable error;
+ public MessageInError(Object initialMsg, ActionProcessor processor, Throwable error) {
+ this.initialMsg = initialMsg;
+ this.processor = processor;
+ this.error = error;
+ }
+ }
}
More information about the jboss-svn-commits
mailing list