[jboss-svn-commits] JBL Code SVN: r6109 - in labs/jbossesb/workspace/tfennelly/product/core/listeners: src/org/jboss/soa/esb src/org/jboss/soa/esb/actions src/org/jboss/soa/esb/command src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb tests/src/org/jboss/soa/esb/actions tests/src/org/jboss/soa/esb/command tests/src/org/jboss/soa/esb/util

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Sep 7 12:54:29 EDT 2006


Author: tfennelly
Date: 2006-09-07 12:54:21 -0400 (Thu, 07 Sep 2006)
New Revision: 6109

Added:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
Modified:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
Log:
Add the CommandQueue abstraction + the Jms and InMemory impls.
Started adding the TransformAction stuff.

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,29 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+public class TransformAction extends AbstractAction {
+
+	public TransformAction(DomElement actionConfig, Object p_oCurr) {
+		super(actionConfig, p_oCurr);
+		
+		System.out.println(actionConfig.toString());
+	}
+
+	@Override
+	public void processCurrentObject() throws Exception {
+		
+	}
+
+	@Override
+	public Serializable getOkNotification() {
+		return "OK";
+	}
+
+	@Override
+	public Serializable getErrorNotification() {
+		return "Error";
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,33 @@
+package org.jboss.soa.esb.command;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Command queue abstraction.
+ * @author tfennelly
+ */
+public interface CommandQueue {
+
+	/**
+	 * Open the command queue.
+	 * @param config Command queue configuration.
+	 * @throws CommandQueueException Queue exception.  Check for probable chained cause exceptions.
+	 */
+	public void open(DomElement config) throws CommandQueueException;
+	
+	/**
+	 * Receive a message from the underlying queue implementation.
+	 * <p/>
+	 * Performs a blocking receive on the command queue, controled by the receive timeout.
+	 * @param timeout The receive block timeout.  Zero to block indefinitely.
+	 * @return The command message from the queue.
+	 * @throws CommandQueueException Queue exception.  Check for probable chained cause exceptions.
+	 */
+	public String receiveCommand(long timeout) throws CommandQueueException;
+	
+	/**
+	 * Close the command queue.
+	 * @throws CommandQueueException Queue exception.  Check for probable chained cause exceptions.
+	 */
+	public void close() throws CommandQueueException;
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,28 @@
+package org.jboss.soa.esb.command;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Command queue exception.
+ * @author tfennelly
+ */
+public class CommandQueueException extends BaseException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CommandQueueException() {
+		super();
+	}
+
+	public CommandQueueException(String message) {
+		super(message);
+	}
+
+	public CommandQueueException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CommandQueueException(Throwable cause) {
+		super(cause);
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,76 @@
+package org.jboss.soa.esb.command;
+
+import java.util.Hashtable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * In Memory Blocking Command Queue.
+ * <p/>
+ * Suitable for testing or any other purpose.
+ * <p/>
+ * The command queue's configuration needs to specify the
+ * queue name via a "command-queue-name" attribute supplied in the configuration to the
+ * {@link #open(DomElement)} method.  The queues are stored statically and can be accessed via the
+ * {@link #getQueue(String)} method using the queue name.
+ * @author tfennelly
+ */
+public class InMemoryCommandQueue implements CommandQueue {
+
+	/**
+	 * Command queue name attribute name.
+	 */
+	public static final String COMMAND_QUEUE_NAME = "command-queue-name";
+
+	private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
+	
+	private String name;
+	private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+	
+	public void open(DomElement config) throws CommandQueueException {
+		if(config == null) {
+			throw new IllegalArgumentException("null 'config' arg in method call.");
+		}
+		
+		name = config.getAttr(COMMAND_QUEUE_NAME);
+		if(name == null) {
+			throw new CommandQueueException("Attribute 'command-queue-name' must be specified on the command queue configuration.");
+		}
+		commandQueues.put(name, this);
+	}
+
+	/**
+	 * Add a command to the in-memory command queue.
+	 * @param command The command string.
+	 */
+	public void addCommand(String command) {
+		queue.add(command);
+	}
+	
+	public String receiveCommand(long timeout) throws CommandQueueException {
+		if(name == null || !commandQueues.containsKey(name)) {
+			throw new CommandQueueException("Sorry.  Invalid call to 'receiveCommand' method.  Queue is not open!");
+		}
+		
+		try {
+			return queue.take();
+		} catch (InterruptedException e) {
+			throw new CommandQueueException("Error taking command message from command queue.", e);
+		}
+	}
+
+	public void close() throws CommandQueueException {
+		commandQueues.remove(name);
+	}
+	
+	/**
+	 * Get the command queue based on the name supplied in the configuration ("command-queue-name"). 
+	 * @param name The name of the queue ala the "command-queue-name" attribute on the queue configuration.
+	 * @return The MockCommandQueue instance, or null if no such queue exists.
+	 */
+	public static InMemoryCommandQueue getQueue(String name) {
+		return commandQueues.get(name);
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,148 @@
+package org.jboss.soa.esb.command;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * JMS based Command Queue implementation.
+ * <p/>
+ * This code was simply pulled from the GpListener.
+ * @author tfennelly
+ */
+public class JmsCommandQueue implements CommandQueue {
+
+	private static Logger logger = Logger.getLogger(JmsCommandQueue.class);
+
+	public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
+
+	public static final String COMMAND_JNDI_TYPE = "commandJndiType";
+
+	public static final String COMMAND_JNDI_URL = "commandJndiURL";
+
+	public static final String COMMAND_IS_TOPIC = "commandIsTopic";
+
+	public static final String COMMAND_JNDI_NAME = "commandJndiName";
+
+	public static final String COMMAND_MSG_SELECTOR = "messageSelector";
+	
+	private MessageConsumer m_oCmdSrc;
+
+	private Session m_oJmsSess;
+
+	private Connection m_oJmsConn;
+	
+	public void open(DomElement config) throws CommandQueueException {
+		try {
+			initialiseJMS(config);
+		} catch (Exception e) {
+			throw new CommandQueueException("Failed to initialise JMS Command Queue.", e);
+		}
+	}
+
+	public void close() throws CommandQueueException {
+		if (null != m_oJmsSess) {
+			try {
+				m_oJmsSess.close();
+			} catch (JMSException eS) {/* Tried my best - Just continue */
+			}
+		}
+		if (null != m_oJmsConn) {
+			try {
+				m_oJmsConn.close();
+			} catch (JMSException eC) {/* Tried my best - Just continue */
+			}
+		}
+	}
+
+	public String receiveCommand(long timeout) throws CommandQueueException {
+		try {
+			Message jmsMessage = m_oCmdSrc.receive(timeout);
+			
+			if (null == jmsMessage)
+				return null;
+			if (jmsMessage instanceof TextMessage) {
+				return ((TextMessage)jmsMessage).getText();
+			} else {
+				logger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+			}
+		} catch(Exception e) {
+			throw new CommandQueueException("Exception receiving message from JMS Command Queue.", e);
+		}
+		
+		return null;
+	}
+
+	private void initialiseJMS(DomElement p_oP) throws Exception {
+		// Only check for JMS attributes if a queue JNDI name was specified
+		String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+		if (!Util.isNullString(sJndiName)) {
+			Map<String, Object> oNewAtts = new HashMap<String, Object>();
+
+			oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
+
+			String sJndiType = GpListener.obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
+			oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
+			String sJndiURL = GpListener.obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
+			oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
+			Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+					sJndiURL);
+
+			String sFactClass = GpListener.obtainAtt(p_oP, COMMAND_CONN_FACTORY,
+					"ConnectionFactory");
+			oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
+			if (Util.isNullString(sFactClass))
+				sFactClass = "ConnectionFactory";
+			Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+			String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+			if (null != sMsgSelector)
+				oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
+
+			boolean bIsTopic = Boolean.parseBoolean(GpListener.obtainAtt(p_oP,
+					COMMAND_IS_TOPIC, "false"));
+			if (bIsTopic) {
+				TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
+				TopicConnection oTC = tcf.createTopicConnection();
+				Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+				TopicSession oSess = oTC.createTopicSession(false,
+						TopicSession.AUTO_ACKNOWLEDGE);
+				m_oJmsConn = oTC;
+				m_oJmsSess = oSess;
+				oTC.start();
+				m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
+			} else {
+				QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
+				QueueConnection oQC = qcf.createQueueConnection();
+				javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
+						.lookup(sJndiName);
+				QueueSession oSess = oQC.createQueueSession(false,
+						TopicSession.AUTO_ACKNOWLEDGE);
+				oQC.start();
+				m_oJmsConn = oQC;
+				m_oJmsSess = oSess;
+				m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+			}
+		}
+	}
+}

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -22,22 +22,28 @@
 
 package org.jboss.soa.esb.listeners;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
 import java.text.SimpleDateFormat;
-import java.util.*;
-import java.lang.reflect.*;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
-import javax.jms.*;
-import javax.naming.*;
-
-import org.apache.log4j.*;
-
+import org.apache.log4j.Logger;
 import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.command.CommandQueue;
+import org.jboss.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.command.JmsCommandQueue;
 import org.jboss.soa.esb.common.SystemProperties;
-import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.helpers.DomElement;
 import org.jboss.soa.esb.notification.NotificationList;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.InotificationHandler;
+import org.jboss.soa.esb.services.NotificationHandlerFactory;
 import org.jboss.soa.esb.util.Util;
 import org.xml.sax.SAXException;
 
@@ -83,18 +89,6 @@
 												// parameter reloads
 	;
 
-	public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
-
-	public static final String COMMAND_JNDI_TYPE = "commandJndiType";
-
-	public static final String COMMAND_JNDI_URL = "commandJndiURL";
-
-	public static final String COMMAND_IS_TOPIC = "commandIsTopic";
-
-	public static final String COMMAND_JNDI_NAME = "commandJndiName";
-
-	public static final String COMMAND_MSG_SELECTOR = "messageSelector";
-
 	public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
 
 	public static final String PARM_END_TIME = "endTime";
@@ -164,12 +158,8 @@
 		}
 	};
 
-	private MessageConsumer m_oCmdSrc;
+	private CommandQueue commandQueue;
 
-	private Session m_oJmsSess;
-
-	private Connection m_oJmsConn;
-
 	/**
 	 * Construct a Listener Manager from the named repository based
 	 * configuration.
@@ -253,66 +243,18 @@
 	public void checkParms(DomElement p_oP) throws Exception {
 		// We've just loaded - set to false until next reload requested
 		m_bReloadRequested = false;
-		m_oCmdSrc = null;
+		commandQueue = createCommandQueue(p_oP);
 
-		Map<String, Object> oNewAtts = new HashMap<String, Object>();
+		// Open the command queue...
+		commandQueue.open(p_oP);
 
-		// Only check for JMS attributes if a queue JNDI name was specified
-		String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
-		if (!Util.isNullString(sJndiName)) {
-			oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
-
-			String sJndiType = obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
-			oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
-			String sJndiURL = obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
-			oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
-			Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
-					sJndiURL);
-
-			String sFactClass = obtainAtt(p_oP, COMMAND_CONN_FACTORY,
-					"ConnectionFactory");
-			oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
-			if (Util.isNullString(sFactClass))
-				sFactClass = "ConnectionFactory";
-			Object oFactCls = oJndiCtx.lookup(sFactClass);
-
-			String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
-			if (null != sMsgSelector)
-				oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
-
-			boolean bIsTopic = Boolean.parseBoolean(obtainAtt(p_oP,
-					COMMAND_IS_TOPIC, "false"));
-			if (bIsTopic) {
-				TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
-				TopicConnection oTC = tcf.createTopicConnection();
-				Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
-				TopicSession oSess = oTC.createTopicSession(false,
-						TopicSession.AUTO_ACKNOWLEDGE);
-				m_oJmsConn = oTC;
-				m_oJmsSess = oSess;
-				oTC.start();
-				m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
-			} else {
-				QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
-				QueueConnection oQC = qcf.createQueueConnection();
-				javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
-						.lookup(sJndiName);
-				QueueSession oSess = oQC.createQueueSession(false,
-						TopicSession.AUTO_ACKNOWLEDGE);
-				oQC.start();
-				m_oJmsConn = oQC;
-				m_oJmsSess = oSess;
-				m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
-			}
-		}
-
 		// if PARM_RELOAD_SECS not set, and no command queue
 		// then reload every 10 minutes
 		// If there is a command queue, run until command is received
 		String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
 		m_lNextReload = (null != sRldSecs) ? System.currentTimeMillis() + 1000
 				* Long.parseLong(sRldSecs)
-				: (null == m_oCmdSrc) ? Long.MAX_VALUE : System
+				: (null == commandQueue) ? Long.MAX_VALUE : System
 						.currentTimeMillis()
 						+ m_iDfltReloadMillis;
 
@@ -325,6 +267,21 @@
 
 	} // ________________________________
 
+	private CommandQueue createCommandQueue(DomElement config) {
+		String commandQueueClass = config.getAttr("command-queue-class");
+		
+		if(commandQueueClass != null) {
+			try {
+				return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+			} catch (Exception e) {
+				m_oLogger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "].  Defaulting to the JMS Command Queue", e);
+			}
+		}
+		
+		// Default command queue...
+		return new JmsCommandQueue();
+	}
+
 	/**
 	 * Main execution loop <p/> Will continue to run until either <p/>a) run
 	 * time is expired <p/>b) quiesce command is received in command queue
@@ -373,16 +330,12 @@
 		m_oLogger
 				.info("Finishing_____________________________________________________");
 
-		if (null != m_oJmsSess)
-			try {
-				m_oJmsSess.close();
-			} catch (JMSException eS) {/* Tried my best - Just continue */
-			}
-		if (null != m_oJmsConn)
-			try {
-				m_oJmsConn.close();
-			} catch (JMSException eC) {/* Tried my best - Just continue */
-			}
+		// Close the command queue...
+		try {
+			commandQueue.close();
+		} catch (CommandQueueException e) {
+			m_oLogger.error("Error closing Command Queue.", e);
+		}
 	} // ________________________________
 
 	private void tryToLaunchChildListener(DomElement p_oP, String p_sClassName) {
@@ -405,7 +358,7 @@
 	private void waitForCmdOrSleep() {
 		long lToGo = millisToWait();
 
-		if (null == m_oCmdSrc) {
+		if (null == commandQueue) {
 			m_oLogger.debug("About to sleep " + lToGo);
 			// No command queue nor topic - Just sleep until time
 			// exhausted, or thread interrupted
@@ -422,22 +375,17 @@
 		// that's why time to go is recalculated on each cycle
 		while ((lToGo = millisToWait()) > 0) {
 			try {
-				m_oLogger.info("Waiting for command ... timeout=" + lToGo
-						+ " millis");
-				// for the time being, only text messages allowed
-				// THIS WILL CHANGE !!
-				Message oM = m_oCmdSrc.receive(lToGo);
-				if (null == oM)
+				m_oLogger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+				String oM = commandQueue.receiveCommand(lToGo);
+				if (null == oM) {
 					return;
-				if (!(oM instanceof TextMessage)) {
-					m_oLogger
-							.warn("Message in command queue IGNORED - should be instanceof TextMessage");
-					return;
 				}
-				processCommand((TextMessage) oM);
-				if (endRequested() || timeToReload())
+				processCommand(oM);
+				if (endRequested() || timeToReload()) {
 					break;
-			} catch (JMSException eJ) {
+				}
+			} catch (CommandQueueException eJ) {
 				m_oLogger.info("receive on command queue failed", eJ);
 			}
 		}
@@ -472,40 +420,37 @@
 	 * </TABLE> * startsWith() <p/>
 	 * 
 	 * @param p_oMsg
-	 *            TextMessage - Received in command queue/topic
+	 *            Message received from the command queue.
 	 * 
 	 */
-	private void processCommand(TextMessage p_oMsg) {
-		try {
-			String sTxt = p_oMsg.getText();
-			if (null == sTxt)
-				return;
-			String sLow = sTxt.trim().toLowerCase();
-			if (sLow.startsWith("shutdown")) {
-				m_bEndRequested = true;
-				m_oLogger.info("Shutdown has been requested");
-				return;
+	private void processCommand(String sTxt) {
+		if (null == sTxt)
+			return;
+		
+		String sLow = sTxt.trim().toLowerCase();
+		if (sLow.startsWith("shutdown")) {
+			m_bEndRequested = true;
+			m_oLogger.info("Shutdown has been requested");
+			return;
+		}
+		if (sLow.startsWith("reload param")) {
+			m_bReloadRequested = true;
+			m_oLogger
+					.info("Request for parameter reload has been received");
+			return;
+		}
+		String[] sa = sLow.split("\\s+");
+		if (sa.length > 1 && "endtime".equals(sa[0])) {
+			try {
+				String sDate = sa[1];
+				String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+						: sa[2];
+				Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+				m_oLogger.info("New end date set to : " + oEnd);
+				m_lEndTime = oEnd.getTime();
+			} catch (Exception eDat) {
+				m_oLogger.info("Problems with endTime command", eDat);
 			}
-			if (sLow.startsWith("reload param")) {
-				m_bReloadRequested = true;
-				m_oLogger
-						.info("Request for parameter reload has been received");
-				return;
-			}
-			String[] sa = sLow.split("\\s+");
-			if (sa.length > 1 && "endtime".equals(sa[0]))
-				try {
-					String sDate = sa[1];
-					String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
-							: sa[2];
-					Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
-					m_oLogger.info("New end date set to : " + oEnd);
-					m_lEndTime = oEnd.getTime();
-				} catch (Exception eDat) {
-					m_oLogger.info("Problems with endTime command", eDat);
-				}
-		} catch (JMSException eJ) {
-			m_oLogger.info("Problems with command queue", eJ);
 		}
 	} // ________________________________
 
@@ -584,7 +529,7 @@
 	 *             If requested attribute not found and no default value
 	 *             supplied by invoker
 	 */
-	static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+	public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
 			throws Exception {
 		String sVal = p_oP.getAttr(p_sAtt);
 		if ((null == sVal) && (null == p_sDefault))

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,19 @@
+<TransformActionListener
+	command-queue-class="org.jboss.soa.esb.command.InMemoryCommandQueue"
+	command-queue-name="test-queue"
+>
+	<ListenerConfig
+		listenerClass="org.jboss.soa.esb.util.MockPoller"
+		actionClass="org.jboss.soa.esb.actions.TransformAction"
+		maxThreads="1"
+	>
+		<NotificationList type="OK">
+			<target class="org.jboss.soa.esb.util.MockNotificationTarget" name="ok-target" />
+		</NotificationList>
+	
+		<NotificationList type="err">
+			<target class="org.jboss.soa.esb.util.MockNotificationTarget" name="err-target" />
+		</NotificationList>
+	</ListenerConfig>
+
+</TransformActionListener>

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,18 @@
+package org.jboss.soa.esb.actions;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.util.ListenersManagerExecThread;
+
+import junit.framework.TestCase;
+
+public class TransformActionListenerUnitTest extends TestCase {
+
+	public void test() throws Exception {
+		DomElement config = DomElement.fromInputStream(getClass().getResourceAsStream("TransformActionListener-Config-01.xml"));
+		GpListener listenerManager = new GpListener(config);
+		ListenersManagerExecThread execThread = new ListenersManagerExecThread(listenerManager);
+		
+		// TODO: ... Work in progress...
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,10 @@
+package org.jboss.soa.esb.actions;
+
+import junit.framework.TestCase;
+
+public class TransformActionUnitTest extends TestCase {
+
+	public void test() {
+		// TODO: ... Work in progress...
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,121 @@
+package org.jboss.soa.esb.command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+import junit.framework.TestCase;
+
+public class InMemoryCommandQueueUnitTest extends TestCase {
+
+	public void test_args() throws CommandQueueException {
+		InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+		
+		try {
+			commandQueue.open(null);
+			fail("Expected IllegalArgumentException.");
+		} catch (IllegalArgumentException e) {
+			// OK
+		}
+
+		DomElement config = new DomElement("config");
+		try {
+			commandQueue.open(config);
+			fail("Expected CommandQueueException.");
+		} catch (CommandQueueException e) {
+			// OK
+		}
+	}
+	
+	public void test_queue_open_close() throws CommandQueueException {
+		DomElement config = new DomElement("config");
+		InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+		config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+		assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+		commandQueue.open(config);
+		assertEquals(commandQueue, InMemoryCommandQueue.getQueue("test-queue"));
+		commandQueue.close();
+		assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+	}
+	
+	public void test_queue_receive() throws CommandQueueException, InterruptedException {
+		DomElement config = new DomElement("config");
+		InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+		// receive should fail if the queue hasn't been opened yet...
+		try {
+			commandQueue.receiveCommand(0);
+			fail("Expected CommandQueueException.");
+		} catch (CommandQueueException e) {
+			// OK
+		}
+		
+		config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+		commandQueue.open(config);
+		
+		// Start the consumer thread - it will receive the commands from the queue.
+		CommandConsumerThread consumerThread = new CommandConsumerThread(commandQueue);
+		consumerThread.start();
+		
+		// Make sure the thread is running.
+		assertTrue(consumerThread.isRunning);
+		
+		commandQueue.addCommand("command1");
+		assertCommandReceived(consumerThread, "command1", 0);
+		commandQueue.addCommand("command2");
+		assertCommandReceived(consumerThread, "command2", 1);
+		commandQueue.addCommand("command3");
+		assertCommandReceived(consumerThread, "command3", 2);
+		
+		// Stop the queue thread...
+		commandQueue.addCommand("stop");
+		Thread.sleep(50);
+		assertTrue(!consumerThread.isRunning);  // this flag being reset proves the stop command was consumed and so the queue is really working
+		assertEquals(4, consumerThread.unblockCount); // Should have unblocked 4 times - once for each command.
+		
+		// receive should fail if the queue has been closed...
+		commandQueue.close();
+		try {
+			commandQueue.receiveCommand(0);
+			fail("Expected CommandQueueException.");
+		} catch (CommandQueueException e) {
+			// OK
+		}
+	}
+	
+	private void assertCommandReceived(CommandConsumerThread consumerThread, String expected, int index) throws InterruptedException {
+		Thread.sleep(50);
+		assertEquals("Received commands queue is not the expected length.", index + 1, consumerThread.commandsReceived.size());
+		assertEquals("Command not found as last added command.", expected, consumerThread.commandsReceived.get(index));
+	}
+
+	private class CommandConsumerThread extends Thread {
+
+		private List<String> commandsReceived = new ArrayList<String>();
+		private CommandQueue commandQueue;
+		private boolean isRunning = true;
+		private int unblockCount = 0;
+		
+		private CommandConsumerThread(CommandQueue commandQueue) {
+			this.commandQueue = commandQueue;
+		}
+		
+		@Override
+		public void run() {
+			String command = null;
+			
+			while(!"stop".equals(command)) {
+				try {
+					command = commandQueue.receiveCommand(0);
+					commandsReceived.add(command);
+				} catch (CommandQueueException e) {
+					fail("CommandQueue Exception: " + e.getMessage());
+				}
+				unblockCount++;
+			}
+			isRunning = false;
+		}		
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,90 @@
+package org.jboss.soa.esb.util;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.listeners.GpListener;
+
+public class ListenersManagerExecThread extends Thread {
+	
+	private static Logger logger = Logger.getLogger(ListenersManagerExecThread.class);
+	private GpListener listenersManager;
+
+	public ListenersManagerExecThread(GpListener listenersManager) {
+		super(listenersManager);
+		this.listenersManager = listenersManager;
+	}
+
+	@Override
+	public synchronized void start() {
+		logger.info("Waiting on Listener Manager the start...");
+		super.start();
+		while(listenersManager.getState() != GpListener.State.Running) {
+			try {
+				sleep(50);
+			} catch (InterruptedException e) {
+				throw new IllegalStateException("Unexpected Thread Interrupt exception.", e);
+			}
+			if(listenersManager.getState() == GpListener.State.Exception_thrown) {
+				Exception e = listenersManager.getState().getException();
+				logger.error("Failed to start the Listener Manager!", e);
+				TestCase.fail(e.getMessage());
+			}
+		}
+		logger.info("Listener Manager running (Thread: " + getName() + ")!  Note this does not mean all the Listeners are up and running!");
+	}
+
+
+	/**
+	 * Get the {@link GpListener} Listeners Manager class executing in this thread.
+	 * @return The listenersManager property value.
+	 */
+	public GpListener getListenersManager() {
+		return listenersManager;
+	}
+
+	/**
+	 * Assert that the listener Manager is in an Exception state..
+	 */
+	public void asserttInException() {
+		if(listenersManager.getState() != GpListener.State.Exception_thrown) {
+			String errorMsg = "ListenerManager not in Exception state.  Listener Manager Thread: " + this.getName();
+			logger.error(errorMsg);
+			TestCase.fail(errorMsg);
+		}
+	}
+
+	/**
+	 * Assert that the listener Manager is not in an Exception state..
+	 */
+	public void assertNotInException() {
+		if(listenersManager.getState() == GpListener.State.Exception_thrown) {
+			String errorMsg = "ListenerManager in Exception state.  See log.  Listener Manager Thread: " + this.getName();
+			logger.error(errorMsg, listenersManager.getState().getException());
+			TestCase.fail(errorMsg);
+		}
+	}
+
+	/**
+	 * Assert that the listener Manager has shutdown.
+	 * @param maxWait The maximum length of time (ms) to wait for shutdown before failing the test.
+	 */
+	public void assertShutdownOK(long maxWait) {
+		long endTime = System.currentTimeMillis() + maxWait;
+		
+		while(System.currentTimeMillis() < endTime) {
+			if(listenersManager.getState() == GpListener.State.Done_OK) {
+				logger.info("Shutdown was successful.  Listener Manager Thread: " + this.getName());
+				return;
+			}
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+				logger.error("Thread interupt...", e);
+			}
+		}
+		String errorMsg = "ListenerManager failed to shutdown as requested.  Waited for " + maxWait + "ms.  Listener Manager Thread: " + this.getName();
+		logger.error(errorMsg);
+		TestCase.fail(errorMsg);
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,68 @@
+package org.jboss.soa.esb.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.notification.NotificationTarget;
+
+/**
+ * Mock NotificationTarget Implementation.
+ * <p/>
+ * Configured by giving the target output list a 'name'.  Notifications are statically accessed via the static
+ * {@link #getTargetList(String)} method, supplying the target list name.
+ * <p/>
+ * Sample config:
+ * <pre>
+ * &lt;NotificationList type="OK"&gt;
+ * 	&lt;target class="org.jboss.soa.esb.util.MockNotificationTarget" <b>name="ok-target"</b> /&gt;
+ * &lt;/NotificationList&gt;
+ * </pre>
+ * @author tfennelly
+ */
+public class MockNotificationTarget extends NotificationTarget {
+
+	private static Hashtable<String, List<Serializable>> targetLists = new Hashtable<String, List<Serializable>>();
+	private List<Serializable> targetList;
+	
+	public MockNotificationTarget(DomElement config) {
+		super(config);
+		
+		String name = config.getAttr("name");
+		
+		if(name == null || name.trim().equals("")) {
+			TestCase.fail("Mock NotificationTarget configured incorrectly.  Must specify a 'name' attribute on the NotificationList/target element.");
+		}
+		
+		targetList = getTargetList(name);
+	}
+	
+	public static List<Serializable> getTargetList(String name) {
+		synchronized (targetLists) {
+			List<Serializable> notificationList = targetLists.get(name);
+			
+			// Never return a null list.
+			if(notificationList == null) {
+				notificationList = new ArrayList<Serializable>();
+				targetLists.put(name, notificationList);
+			}
+			
+			return notificationList;
+		}
+	}
+	
+	public static void clearNotifications() {
+		synchronized (targetLists) {
+			targetLists.clear();
+		}
+	}
+
+	@Override
+	public void sendNotification(Serializable notificationObject) throws Exception {
+		targetList.add(notificationObject);
+	}
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java	2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java	2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,58 @@
+package org.jboss.soa.esb.util;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.AbstractPoller;
+import org.jboss.soa.esb.listeners.GpListener;
+
+/**
+ * Simple Mock {@link org.jboss.soa.esb.listeners.AbstractPoller} implementation that can be used for testing.
+ * <p/>
+ * Maintains a static in-memory queue into which objects can be dropped for processing by the configured action handler.
+ * @author tfennelly
+ */
+public class MockPoller extends AbstractPoller {
+
+	private static Queue queue = new ConcurrentLinkedQueue<Object>();
+	
+	public MockPoller(GpListener p_oDad, DomElement p_oParms) throws Exception {
+		super(p_oDad, p_oParms);
+	}
+	
+	/**
+	 * Add an object for processing by the configured action handler.
+	 * @param actionObject The Object instance for processing.
+	 */
+	public static void addToQueue(Object actionObject) {
+		synchronized (queue) {
+			queue.add(actionObject);
+		}
+	}
+	
+	public static void clearQueue() {
+		synchronized (queue) {
+			queue.clear();			
+		}
+	}
+
+	@Override
+	protected List<Object> pollForCandidates() {
+		// This method is called periodically by the AbstractPoller. 
+		synchronized (queue) {
+			List<Object> actionObjects = Arrays.asList(queue.toArray());
+			queue.clear();			
+			return actionObjects;
+		}
+	}
+
+	@Override
+	protected Object preProcess(Object actionObject) throws Exception {
+		// Called by the AbstractPoller for each of the objects returned to it be the above pollForCandidates method.
+		// Just return the object to be processed by the action class that's configured on the listener.
+		return actionObject;
+	}
+}




More information about the jboss-svn-commits mailing list