[jboss-svn-commits] JBL Code SVN: r6109 - in labs/jbossesb/workspace/tfennelly/product/core/listeners: src/org/jboss/soa/esb src/org/jboss/soa/esb/actions src/org/jboss/soa/esb/command src/org/jboss/soa/esb/listeners tests/src/org/jboss/soa/esb tests/src/org/jboss/soa/esb/actions tests/src/org/jboss/soa/esb/command tests/src/org/jboss/soa/esb/util
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Sep 7 12:54:29 EDT 2006
Author: tfennelly
Date: 2006-09-07 12:54:21 -0400 (Thu, 07 Sep 2006)
New Revision: 6109
Added:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
Modified:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
Log:
Add the CommandQueue abstraction + the Jms and InMemory impls.
Started adding the TransformAction stuff.
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,29 @@
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+public class TransformAction extends AbstractAction {
+
+ public TransformAction(DomElement actionConfig, Object p_oCurr) {
+ super(actionConfig, p_oCurr);
+
+ System.out.println(actionConfig.toString());
+ }
+
+ @Override
+ public void processCurrentObject() throws Exception {
+
+ }
+
+ @Override
+ public Serializable getOkNotification() {
+ return "OK";
+ }
+
+ @Override
+ public Serializable getErrorNotification() {
+ return "Error";
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,33 @@
+package org.jboss.soa.esb.command;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Command queue abstraction.
+ * @author tfennelly
+ */
+public interface CommandQueue {
+
+ /**
+ * Open the command queue.
+ * @param config Command queue configuration.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public void open(DomElement config) throws CommandQueueException;
+
+ /**
+ * Receive a message from the underlying queue implementation.
+ * <p/>
+ * Performs a blocking receive on the command queue, controled by the receive timeout.
+ * @param timeout The receive block timeout. Zero to block indefinitely.
+ * @return The command message from the queue.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public String receiveCommand(long timeout) throws CommandQueueException;
+
+ /**
+ * Close the command queue.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public void close() throws CommandQueueException;
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,28 @@
+package org.jboss.soa.esb.command;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Command queue exception.
+ * @author tfennelly
+ */
+public class CommandQueueException extends BaseException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CommandQueueException() {
+ super();
+ }
+
+ public CommandQueueException(String message) {
+ super(message);
+ }
+
+ public CommandQueueException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CommandQueueException(Throwable cause) {
+ super(cause);
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,76 @@
+package org.jboss.soa.esb.command;
+
+import java.util.Hashtable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * In Memory Blocking Command Queue.
+ * <p/>
+ * Suitable for testing or any other purpose.
+ * <p/>
+ * The command queue's configuration needs to specify the
+ * queue name via a "command-queue-name" attribute supplied in the configuration to the
+ * {@link #open(DomElement)} method. The queues are stored statically and can be accessed via the
+ * {@link #getQueue(String)} method using the queue name.
+ * @author tfennelly
+ */
+public class InMemoryCommandQueue implements CommandQueue {
+
+ /**
+ * Command queue name attribute name.
+ */
+ public static final String COMMAND_QUEUE_NAME = "command-queue-name";
+
+ private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
+
+ private String name;
+ private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+ public void open(DomElement config) throws CommandQueueException {
+ if(config == null) {
+ throw new IllegalArgumentException("null 'config' arg in method call.");
+ }
+
+ name = config.getAttr(COMMAND_QUEUE_NAME);
+ if(name == null) {
+ throw new CommandQueueException("Attribute 'command-queue-name' must be specified on the command queue configuration.");
+ }
+ commandQueues.put(name, this);
+ }
+
+ /**
+ * Add a command to the in-memory command queue.
+ * @param command The command string.
+ */
+ public void addCommand(String command) {
+ queue.add(command);
+ }
+
+ public String receiveCommand(long timeout) throws CommandQueueException {
+ if(name == null || !commandQueues.containsKey(name)) {
+ throw new CommandQueueException("Sorry. Invalid call to 'receiveCommand' method. Queue is not open!");
+ }
+
+ try {
+ return queue.take();
+ } catch (InterruptedException e) {
+ throw new CommandQueueException("Error taking command message from command queue.", e);
+ }
+ }
+
+ public void close() throws CommandQueueException {
+ commandQueues.remove(name);
+ }
+
+ /**
+ * Get the command queue based on the name supplied in the configuration ("command-queue-name").
+ * @param name The name of the queue ala the "command-queue-name" attribute on the queue configuration.
+ * @return The MockCommandQueue instance, or null if no such queue exists.
+ */
+ public static InMemoryCommandQueue getQueue(String name) {
+ return commandQueues.get(name);
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,148 @@
+package org.jboss.soa.esb.command;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * JMS based Command Queue implementation.
+ * <p/>
+ * This code was simply pulled from the GpListener.
+ * @author tfennelly
+ */
+public class JmsCommandQueue implements CommandQueue {
+
+ private static Logger logger = Logger.getLogger(JmsCommandQueue.class);
+
+ public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
+
+ public static final String COMMAND_JNDI_TYPE = "commandJndiType";
+
+ public static final String COMMAND_JNDI_URL = "commandJndiURL";
+
+ public static final String COMMAND_IS_TOPIC = "commandIsTopic";
+
+ public static final String COMMAND_JNDI_NAME = "commandJndiName";
+
+ public static final String COMMAND_MSG_SELECTOR = "messageSelector";
+
+ private MessageConsumer m_oCmdSrc;
+
+ private Session m_oJmsSess;
+
+ private Connection m_oJmsConn;
+
+ public void open(DomElement config) throws CommandQueueException {
+ try {
+ initialiseJMS(config);
+ } catch (Exception e) {
+ throw new CommandQueueException("Failed to initialise JMS Command Queue.", e);
+ }
+ }
+
+ public void close() throws CommandQueueException {
+ if (null != m_oJmsSess) {
+ try {
+ m_oJmsSess.close();
+ } catch (JMSException eS) {/* Tried my best - Just continue */
+ }
+ }
+ if (null != m_oJmsConn) {
+ try {
+ m_oJmsConn.close();
+ } catch (JMSException eC) {/* Tried my best - Just continue */
+ }
+ }
+ }
+
+ public String receiveCommand(long timeout) throws CommandQueueException {
+ try {
+ Message jmsMessage = m_oCmdSrc.receive(timeout);
+
+ if (null == jmsMessage)
+ return null;
+ if (jmsMessage instanceof TextMessage) {
+ return ((TextMessage)jmsMessage).getText();
+ } else {
+ logger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+ }
+ } catch(Exception e) {
+ throw new CommandQueueException("Exception receiving message from JMS Command Queue.", e);
+ }
+
+ return null;
+ }
+
+ private void initialiseJMS(DomElement p_oP) throws Exception {
+ // Only check for JMS attributes if a queue JNDI name was specified
+ String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+ if (!Util.isNullString(sJndiName)) {
+ Map<String, Object> oNewAtts = new HashMap<String, Object>();
+
+ oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
+
+ String sJndiType = GpListener.obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
+ oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
+ String sJndiURL = GpListener.obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
+ oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+ sJndiURL);
+
+ String sFactClass = GpListener.obtainAtt(p_oP, COMMAND_CONN_FACTORY,
+ "ConnectionFactory");
+ oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
+ if (Util.isNullString(sFactClass))
+ sFactClass = "ConnectionFactory";
+ Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+ String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+ if (null != sMsgSelector)
+ oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
+
+ boolean bIsTopic = Boolean.parseBoolean(GpListener.obtainAtt(p_oP,
+ COMMAND_IS_TOPIC, "false"));
+ if (bIsTopic) {
+ TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
+ TopicConnection oTC = tcf.createTopicConnection();
+ Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+ TopicSession oSess = oTC.createTopicSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ m_oJmsConn = oTC;
+ m_oJmsSess = oSess;
+ oTC.start();
+ m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
+ } else {
+ QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
+ QueueConnection oQC = qcf.createQueueConnection();
+ javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
+ .lookup(sJndiName);
+ QueueSession oSess = oQC.createQueueSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ oQC.start();
+ m_oJmsConn = oQC;
+ m_oJmsSess = oSess;
+ m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+ }
+ }
+ }
+}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -22,22 +22,28 @@
package org.jboss.soa.esb.listeners;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat;
-import java.util.*;
-import java.lang.reflect.*;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
-import javax.jms.*;
-import javax.naming.*;
-
-import org.apache.log4j.*;
-
+import org.apache.log4j.Logger;
import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.command.CommandQueue;
+import org.jboss.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.command.JmsCommandQueue;
import org.jboss.soa.esb.common.SystemProperties;
-import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.helpers.DomElement;
import org.jboss.soa.esb.notification.NotificationList;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.InotificationHandler;
+import org.jboss.soa.esb.services.NotificationHandlerFactory;
import org.jboss.soa.esb.util.Util;
import org.xml.sax.SAXException;
@@ -83,18 +89,6 @@
// parameter reloads
;
- public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
-
- public static final String COMMAND_JNDI_TYPE = "commandJndiType";
-
- public static final String COMMAND_JNDI_URL = "commandJndiURL";
-
- public static final String COMMAND_IS_TOPIC = "commandIsTopic";
-
- public static final String COMMAND_JNDI_NAME = "commandJndiName";
-
- public static final String COMMAND_MSG_SELECTOR = "messageSelector";
-
public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
public static final String PARM_END_TIME = "endTime";
@@ -164,12 +158,8 @@
}
};
- private MessageConsumer m_oCmdSrc;
+ private CommandQueue commandQueue;
- private Session m_oJmsSess;
-
- private Connection m_oJmsConn;
-
/**
* Construct a Listener Manager from the named repository based
* configuration.
@@ -253,66 +243,18 @@
public void checkParms(DomElement p_oP) throws Exception {
// We've just loaded - set to false until next reload requested
m_bReloadRequested = false;
- m_oCmdSrc = null;
+ commandQueue = createCommandQueue(p_oP);
- Map<String, Object> oNewAtts = new HashMap<String, Object>();
+ // Open the command queue...
+ commandQueue.open(p_oP);
- // Only check for JMS attributes if a queue JNDI name was specified
- String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
- if (!Util.isNullString(sJndiName)) {
- oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
-
- String sJndiType = obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
- oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
- String sJndiURL = obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
- oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
- sJndiURL);
-
- String sFactClass = obtainAtt(p_oP, COMMAND_CONN_FACTORY,
- "ConnectionFactory");
- oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
- if (Util.isNullString(sFactClass))
- sFactClass = "ConnectionFactory";
- Object oFactCls = oJndiCtx.lookup(sFactClass);
-
- String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
- if (null != sMsgSelector)
- oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
-
- boolean bIsTopic = Boolean.parseBoolean(obtainAtt(p_oP,
- COMMAND_IS_TOPIC, "false"));
- if (bIsTopic) {
- TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
- TopicConnection oTC = tcf.createTopicConnection();
- Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
- TopicSession oSess = oTC.createTopicSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
- m_oJmsConn = oTC;
- m_oJmsSess = oSess;
- oTC.start();
- m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
- } else {
- QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
- QueueConnection oQC = qcf.createQueueConnection();
- javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
- .lookup(sJndiName);
- QueueSession oSess = oQC.createQueueSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
- oQC.start();
- m_oJmsConn = oQC;
- m_oJmsSess = oSess;
- m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
- }
- }
-
// if PARM_RELOAD_SECS not set, and no command queue
// then reload every 10 minutes
// If there is a command queue, run until command is received
String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
m_lNextReload = (null != sRldSecs) ? System.currentTimeMillis() + 1000
* Long.parseLong(sRldSecs)
- : (null == m_oCmdSrc) ? Long.MAX_VALUE : System
+ : (null == commandQueue) ? Long.MAX_VALUE : System
.currentTimeMillis()
+ m_iDfltReloadMillis;
@@ -325,6 +267,21 @@
} // ________________________________
+ private CommandQueue createCommandQueue(DomElement config) {
+ String commandQueueClass = config.getAttr("command-queue-class");
+
+ if(commandQueueClass != null) {
+ try {
+ return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+ } catch (Exception e) {
+ m_oLogger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to the JMS Command Queue", e);
+ }
+ }
+
+ // Default command queue...
+ return new JmsCommandQueue();
+ }
+
/**
* Main execution loop <p/> Will continue to run until either <p/>a) run
* time is expired <p/>b) quiesce command is received in command queue
@@ -373,16 +330,12 @@
m_oLogger
.info("Finishing_____________________________________________________");
- if (null != m_oJmsSess)
- try {
- m_oJmsSess.close();
- } catch (JMSException eS) {/* Tried my best - Just continue */
- }
- if (null != m_oJmsConn)
- try {
- m_oJmsConn.close();
- } catch (JMSException eC) {/* Tried my best - Just continue */
- }
+ // Close the command queue...
+ try {
+ commandQueue.close();
+ } catch (CommandQueueException e) {
+ m_oLogger.error("Error closing Command Queue.", e);
+ }
} // ________________________________
private void tryToLaunchChildListener(DomElement p_oP, String p_sClassName) {
@@ -405,7 +358,7 @@
private void waitForCmdOrSleep() {
long lToGo = millisToWait();
- if (null == m_oCmdSrc) {
+ if (null == commandQueue) {
m_oLogger.debug("About to sleep " + lToGo);
// No command queue nor topic - Just sleep until time
// exhausted, or thread interrupted
@@ -422,22 +375,17 @@
// that's why time to go is recalculated on each cycle
while ((lToGo = millisToWait()) > 0) {
try {
- m_oLogger.info("Waiting for command ... timeout=" + lToGo
- + " millis");
- // for the time being, only text messages allowed
- // THIS WILL CHANGE !!
- Message oM = m_oCmdSrc.receive(lToGo);
- if (null == oM)
+ m_oLogger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+ String oM = commandQueue.receiveCommand(lToGo);
+ if (null == oM) {
return;
- if (!(oM instanceof TextMessage)) {
- m_oLogger
- .warn("Message in command queue IGNORED - should be instanceof TextMessage");
- return;
}
- processCommand((TextMessage) oM);
- if (endRequested() || timeToReload())
+ processCommand(oM);
+ if (endRequested() || timeToReload()) {
break;
- } catch (JMSException eJ) {
+ }
+ } catch (CommandQueueException eJ) {
m_oLogger.info("receive on command queue failed", eJ);
}
}
@@ -472,40 +420,37 @@
* </TABLE> * startsWith() <p/>
*
* @param p_oMsg
- * TextMessage - Received in command queue/topic
+ * Message received from the command queue.
*
*/
- private void processCommand(TextMessage p_oMsg) {
- try {
- String sTxt = p_oMsg.getText();
- if (null == sTxt)
- return;
- String sLow = sTxt.trim().toLowerCase();
- if (sLow.startsWith("shutdown")) {
- m_bEndRequested = true;
- m_oLogger.info("Shutdown has been requested");
- return;
+ private void processCommand(String sTxt) {
+ if (null == sTxt)
+ return;
+
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown")) {
+ m_bEndRequested = true;
+ m_oLogger.info("Shutdown has been requested");
+ return;
+ }
+ if (sLow.startsWith("reload param")) {
+ m_bReloadRequested = true;
+ m_oLogger
+ .info("Request for parameter reload has been received");
+ return;
+ }
+ String[] sa = sLow.split("\\s+");
+ if (sa.length > 1 && "endtime".equals(sa[0])) {
+ try {
+ String sDate = sa[1];
+ String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+ : sa[2];
+ Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+ m_oLogger.info("New end date set to : " + oEnd);
+ m_lEndTime = oEnd.getTime();
+ } catch (Exception eDat) {
+ m_oLogger.info("Problems with endTime command", eDat);
}
- if (sLow.startsWith("reload param")) {
- m_bReloadRequested = true;
- m_oLogger
- .info("Request for parameter reload has been received");
- return;
- }
- String[] sa = sLow.split("\\s+");
- if (sa.length > 1 && "endtime".equals(sa[0]))
- try {
- String sDate = sa[1];
- String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
- : sa[2];
- Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
- m_oLogger.info("New end date set to : " + oEnd);
- m_lEndTime = oEnd.getTime();
- } catch (Exception eDat) {
- m_oLogger.info("Problems with endTime command", eDat);
- }
- } catch (JMSException eJ) {
- m_oLogger.info("Problems with command queue", eJ);
}
} // ________________________________
@@ -584,7 +529,7 @@
* If requested attribute not found and no default value
* supplied by invoker
*/
- static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+ public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
throws Exception {
String sVal = p_oP.getAttr(p_sAtt);
if ((null == sVal) && (null == p_sDefault))
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListener-Config-01.xml 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,19 @@
+<TransformActionListener
+ command-queue-class="org.jboss.soa.esb.command.InMemoryCommandQueue"
+ command-queue-name="test-queue"
+>
+ <ListenerConfig
+ listenerClass="org.jboss.soa.esb.util.MockPoller"
+ actionClass="org.jboss.soa.esb.actions.TransformAction"
+ maxThreads="1"
+ >
+ <NotificationList type="OK">
+ <target class="org.jboss.soa.esb.util.MockNotificationTarget" name="ok-target" />
+ </NotificationList>
+
+ <NotificationList type="err">
+ <target class="org.jboss.soa.esb.util.MockNotificationTarget" name="err-target" />
+ </NotificationList>
+ </ListenerConfig>
+
+</TransformActionListener>
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionListenerUnitTest.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,18 @@
+package org.jboss.soa.esb.actions;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.util.ListenersManagerExecThread;
+
+import junit.framework.TestCase;
+
+public class TransformActionListenerUnitTest extends TestCase {
+
+ public void test() throws Exception {
+ DomElement config = DomElement.fromInputStream(getClass().getResourceAsStream("TransformActionListener-Config-01.xml"));
+ GpListener listenerManager = new GpListener(config);
+ ListenersManagerExecThread execThread = new ListenersManagerExecThread(listenerManager);
+
+ // TODO: ... Work in progress...
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/actions/TransformActionUnitTest.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,10 @@
+package org.jboss.soa.esb.actions;
+
+import junit.framework.TestCase;
+
+public class TransformActionUnitTest extends TestCase {
+
+ public void test() {
+ // TODO: ... Work in progress...
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,121 @@
+package org.jboss.soa.esb.command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+import junit.framework.TestCase;
+
+public class InMemoryCommandQueueUnitTest extends TestCase {
+
+ public void test_args() throws CommandQueueException {
+ InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+ try {
+ commandQueue.open(null);
+ fail("Expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // OK
+ }
+
+ DomElement config = new DomElement("config");
+ try {
+ commandQueue.open(config);
+ fail("Expected CommandQueueException.");
+ } catch (CommandQueueException e) {
+ // OK
+ }
+ }
+
+ public void test_queue_open_close() throws CommandQueueException {
+ DomElement config = new DomElement("config");
+ InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+ config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+ assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+ commandQueue.open(config);
+ assertEquals(commandQueue, InMemoryCommandQueue.getQueue("test-queue"));
+ commandQueue.close();
+ assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+ }
+
+ public void test_queue_receive() throws CommandQueueException, InterruptedException {
+ DomElement config = new DomElement("config");
+ InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+ // receive should fail if the queue hasn't been opened yet...
+ try {
+ commandQueue.receiveCommand(0);
+ fail("Expected CommandQueueException.");
+ } catch (CommandQueueException e) {
+ // OK
+ }
+
+ config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+ commandQueue.open(config);
+
+ // Start the consumer thread - it will receive the commands from the queue.
+ CommandConsumerThread consumerThread = new CommandConsumerThread(commandQueue);
+ consumerThread.start();
+
+ // Make sure the thread is running.
+ assertTrue(consumerThread.isRunning);
+
+ commandQueue.addCommand("command1");
+ assertCommandReceived(consumerThread, "command1", 0);
+ commandQueue.addCommand("command2");
+ assertCommandReceived(consumerThread, "command2", 1);
+ commandQueue.addCommand("command3");
+ assertCommandReceived(consumerThread, "command3", 2);
+
+ // Stop the queue thread...
+ commandQueue.addCommand("stop");
+ Thread.sleep(50);
+ assertTrue(!consumerThread.isRunning); // this flag being reset proves the stop command was consumed and so the queue is really working
+ assertEquals(4, consumerThread.unblockCount); // Should have unblocked 4 times - once for each command.
+
+ // receive should fail if the queue has been closed...
+ commandQueue.close();
+ try {
+ commandQueue.receiveCommand(0);
+ fail("Expected CommandQueueException.");
+ } catch (CommandQueueException e) {
+ // OK
+ }
+ }
+
+ private void assertCommandReceived(CommandConsumerThread consumerThread, String expected, int index) throws InterruptedException {
+ Thread.sleep(50);
+ assertEquals("Received commands queue is not the expected length.", index + 1, consumerThread.commandsReceived.size());
+ assertEquals("Command not found as last added command.", expected, consumerThread.commandsReceived.get(index));
+ }
+
+ private class CommandConsumerThread extends Thread {
+
+ private List<String> commandsReceived = new ArrayList<String>();
+ private CommandQueue commandQueue;
+ private boolean isRunning = true;
+ private int unblockCount = 0;
+
+ private CommandConsumerThread(CommandQueue commandQueue) {
+ this.commandQueue = commandQueue;
+ }
+
+ @Override
+ public void run() {
+ String command = null;
+
+ while(!"stop".equals(command)) {
+ try {
+ command = commandQueue.receiveCommand(0);
+ commandsReceived.add(command);
+ } catch (CommandQueueException e) {
+ fail("CommandQueue Exception: " + e.getMessage());
+ }
+ unblockCount++;
+ }
+ isRunning = false;
+ }
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/ListenersManagerExecThread.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,90 @@
+package org.jboss.soa.esb.util;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.listeners.GpListener;
+
+public class ListenersManagerExecThread extends Thread {
+
+ private static Logger logger = Logger.getLogger(ListenersManagerExecThread.class);
+ private GpListener listenersManager;
+
+ public ListenersManagerExecThread(GpListener listenersManager) {
+ super(listenersManager);
+ this.listenersManager = listenersManager;
+ }
+
+ @Override
+ public synchronized void start() {
+ logger.info("Waiting on Listener Manager the start...");
+ super.start();
+ while(listenersManager.getState() != GpListener.State.Running) {
+ try {
+ sleep(50);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Unexpected Thread Interrupt exception.", e);
+ }
+ if(listenersManager.getState() == GpListener.State.Exception_thrown) {
+ Exception e = listenersManager.getState().getException();
+ logger.error("Failed to start the Listener Manager!", e);
+ TestCase.fail(e.getMessage());
+ }
+ }
+ logger.info("Listener Manager running (Thread: " + getName() + ")! Note this does not mean all the Listeners are up and running!");
+ }
+
+
+ /**
+ * Get the {@link GpListener} Listeners Manager class executing in this thread.
+ * @return The listenersManager property value.
+ */
+ public GpListener getListenersManager() {
+ return listenersManager;
+ }
+
+ /**
+ * Assert that the listener Manager is in an Exception state..
+ */
+ public void asserttInException() {
+ if(listenersManager.getState() != GpListener.State.Exception_thrown) {
+ String errorMsg = "ListenerManager not in Exception state. Listener Manager Thread: " + this.getName();
+ logger.error(errorMsg);
+ TestCase.fail(errorMsg);
+ }
+ }
+
+ /**
+ * Assert that the listener Manager is not in an Exception state..
+ */
+ public void assertNotInException() {
+ if(listenersManager.getState() == GpListener.State.Exception_thrown) {
+ String errorMsg = "ListenerManager in Exception state. See log. Listener Manager Thread: " + this.getName();
+ logger.error(errorMsg, listenersManager.getState().getException());
+ TestCase.fail(errorMsg);
+ }
+ }
+
+ /**
+ * Assert that the listener Manager has shutdown.
+ * @param maxWait The maximum length of time (ms) to wait for shutdown before failing the test.
+ */
+ public void assertShutdownOK(long maxWait) {
+ long endTime = System.currentTimeMillis() + maxWait;
+
+ while(System.currentTimeMillis() < endTime) {
+ if(listenersManager.getState() == GpListener.State.Done_OK) {
+ logger.info("Shutdown was successful. Listener Manager Thread: " + this.getName());
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ logger.error("Thread interupt...", e);
+ }
+ }
+ String errorMsg = "ListenerManager failed to shutdown as requested. Waited for " + maxWait + "ms. Listener Manager Thread: " + this.getName();
+ logger.error(errorMsg);
+ TestCase.fail(errorMsg);
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockNotificationTarget.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,68 @@
+package org.jboss.soa.esb.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.notification.NotificationTarget;
+
+/**
+ * Mock NotificationTarget Implementation.
+ * <p/>
+ * Configured by giving the target output list a 'name'. Notifications are statically accessed via the static
+ * {@link #getTargetList(String)} method, supplying the target list name.
+ * <p/>
+ * Sample config:
+ * <pre>
+ * <NotificationList type="OK">
+ * <target class="org.jboss.soa.esb.util.MockNotificationTarget" <b>name="ok-target"</b> />
+ * </NotificationList>
+ * </pre>
+ * @author tfennelly
+ */
+public class MockNotificationTarget extends NotificationTarget {
+
+ private static Hashtable<String, List<Serializable>> targetLists = new Hashtable<String, List<Serializable>>();
+ private List<Serializable> targetList;
+
+ public MockNotificationTarget(DomElement config) {
+ super(config);
+
+ String name = config.getAttr("name");
+
+ if(name == null || name.trim().equals("")) {
+ TestCase.fail("Mock NotificationTarget configured incorrectly. Must specify a 'name' attribute on the NotificationList/target element.");
+ }
+
+ targetList = getTargetList(name);
+ }
+
+ public static List<Serializable> getTargetList(String name) {
+ synchronized (targetLists) {
+ List<Serializable> notificationList = targetLists.get(name);
+
+ // Never return a null list.
+ if(notificationList == null) {
+ notificationList = new ArrayList<Serializable>();
+ targetLists.put(name, notificationList);
+ }
+
+ return notificationList;
+ }
+ }
+
+ public static void clearNotifications() {
+ synchronized (targetLists) {
+ targetLists.clear();
+ }
+ }
+
+ @Override
+ public void sendNotification(Serializable notificationObject) throws Exception {
+ targetList.add(notificationObject);
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java 2006-09-07 15:59:58 UTC (rev 6108)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/tests/src/org/jboss/soa/esb/util/MockPoller.java 2006-09-07 16:54:21 UTC (rev 6109)
@@ -0,0 +1,58 @@
+package org.jboss.soa.esb.util;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.AbstractPoller;
+import org.jboss.soa.esb.listeners.GpListener;
+
+/**
+ * Simple Mock {@link org.jboss.soa.esb.listeners.AbstractPoller} implementation that can be used for testing.
+ * <p/>
+ * Maintains a static in-memory queue into which objects can be dropped for processing by the configured action handler.
+ * @author tfennelly
+ */
+public class MockPoller extends AbstractPoller {
+
+ private static Queue queue = new ConcurrentLinkedQueue<Object>();
+
+ public MockPoller(GpListener p_oDad, DomElement p_oParms) throws Exception {
+ super(p_oDad, p_oParms);
+ }
+
+ /**
+ * Add an object for processing by the configured action handler.
+ * @param actionObject The Object instance for processing.
+ */
+ public static void addToQueue(Object actionObject) {
+ synchronized (queue) {
+ queue.add(actionObject);
+ }
+ }
+
+ public static void clearQueue() {
+ synchronized (queue) {
+ queue.clear();
+ }
+ }
+
+ @Override
+ protected List<Object> pollForCandidates() {
+ // This method is called periodically by the AbstractPoller.
+ synchronized (queue) {
+ List<Object> actionObjects = Arrays.asList(queue.toArray());
+ queue.clear();
+ return actionObjects;
+ }
+ }
+
+ @Override
+ protected Object preProcess(Object actionObject) throws Exception {
+ // Called by the AbstractPoller for each of the objects returned to it be the above pollForCandidates method.
+ // Just return the object to be processed by the action class that's configured on the listener.
+ return actionObject;
+ }
+}
More information about the jboss-svn-commits
mailing list