[jboss-svn-commits] JBL Code SVN: r6526 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb listeners/src/org/jboss/soa/esb/actions listeners/src/org/jboss/soa/esb/listeners listeners/tests/src/org/jboss/soa/esb listeners/tests/src/org/jboss/soa/esb/actions listeners/tests/src/org/jboss/soa/esb/listeners rosetta/src/org/jboss/internal/soa/esb rosetta/src/org/jboss/internal/soa/esb/command rosetta/src/org/jboss/soa/esb rosetta/tests/src/org/jboss/soa/esb rosetta/tests/src/org/jboss/soa/esb/command
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Oct 2 13:58:17 EDT 2006
Author: tfennelly
Date: 2006-10-02 13:57:46 -0400 (Mon, 02 Oct 2006)
New Revision: 6526
Added:
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java
labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/
labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
Removed:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/command/
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/command/
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java
Log:
moved commandqueue code to rosetta internal
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -24,7 +24,6 @@
import java.io.File;
import java.io.FileOutputStream;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@@ -107,10 +106,8 @@
synchronized(file) {
File outputFile = getOutputFile();
FileOutputStream fileOutputStream = null;
- FileWriter fileWriter = null;
try {
- fileWriter = new FileWriter(outputFile, append);
fileOutputStream = new FileOutputStream(outputFile, append);
} catch (IOException e) {
throw new ActionProcessingException("Action " + actionName + " failed. Unable to open output file " + outputFile.getAbsolutePath());
@@ -127,7 +124,7 @@
throw new ActionProcessingException("Action " + actionName + " failed. Unable to write to output file " + outputFile.getAbsolutePath(), e);
} finally {
try {
- fileWriter.close();
+ fileOutputStream.close();
} catch (IOException e) {
logger.warn("Exception on closing file " + file.getAbsolutePath(), e);
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -33,11 +33,11 @@
import java.util.Map;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.internal.soa.esb.command.JmsCommandQueue;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.command.CommandQueue;
-import org.jboss.soa.esb.command.CommandQueueException;
-import org.jboss.soa.esb.command.JmsCommandQueue;
import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -73,10 +73,10 @@
properties.add(new KeyValuePair("file", file.getPath()));
properties.add(new KeyValuePair("append", "true"));
- // Write something fo file and check it was written..
- writeAndCheck(A_STRING, A_STRING + A_STRING );
+ // Write something to file and check it was written..
+ writeAndCheck(A_STRING, A_STRING);
// And do it all again to make sure the contents are appended i.e. the file is not overwritten...
- writeAndCheck(A_STRING, A_STRING + A_STRING + A_STRING);
+ writeAndCheck(A_STRING, A_STRING + A_STRING);
}
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml 2006-10-02 17:57:46 UTC (rev 6526)
@@ -1,5 +1,5 @@
<EsbConfig
- command-queue-class="org.jboss.soa.esb.command.InMemoryCommandQueue"
+ command-queue-class="org.jboss.internal.soa.esb.command.InMemoryCommandQueue"
command-queue-name="test-queue"
>
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -23,8 +23,8 @@
import java.util.Date;
+import org.jboss.internal.soa.esb.command.InMemoryCommandQueue;
import org.jboss.soa.esb.actions.ToNowhereRouter;
-import org.jboss.soa.esb.command.InMemoryCommandQueue;
import org.jboss.soa.esb.common.tests.BaseTest;
import org.jboss.soa.esb.helpers.DomElement;
import org.jboss.soa.esb.util.ListenersManagerExecThread;
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,34 @@
+package org.jboss.internal.soa.esb.command;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Command queue abstraction.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public interface CommandQueue {
+
+ /**
+ * Open the command queue.
+ * @param config Command queue configuration.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public void open(DomElement config) throws CommandQueueException;
+
+ /**
+ * Receive a message from the underlying queue implementation.
+ * <p/>
+ * Performs a blocking receive on the command queue, controled by the receive timeout.
+ * @param timeout The receive block timeout. Zero to block indefinitely.
+ * @return The command message from the queue.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public String receiveCommand(long timeout) throws CommandQueueException;
+
+ /**
+ * Close the command queue.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public void close() throws CommandQueueException;
+}
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,29 @@
+package org.jboss.internal.soa.esb.command;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Command queue exception.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class CommandQueueException extends BaseException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CommandQueueException() {
+ super();
+ }
+
+ public CommandQueueException(String message) {
+ super(message);
+ }
+
+ public CommandQueueException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CommandQueueException(Throwable cause) {
+ super(cause);
+ }
+}
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,86 @@
+package org.jboss.internal.soa.esb.command;
+
+import java.util.Hashtable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * In Memory Blocking Command Queue.
+ * <p/>
+ * Suitable for testing or any other purpose.
+ * <p/>
+ * The command queue's configuration needs to specify the
+ * queue name via a "command-queue-name" attribute supplied in the configuration to the
+ * {@link #open(DomElement)} method. The queues are stored statically and can be accessed via the
+ * {@link #getQueue(String)} method using the queue name.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class InMemoryCommandQueue implements CommandQueue {
+
+ /**
+ * Command queue name attribute name.
+ */
+ public static final String COMMAND_QUEUE_NAME = "command-queue-name";
+
+ private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
+
+ private String name;
+ private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+ public void open(DomElement config) throws CommandQueueException {
+ if(config == null) {
+ throw new IllegalArgumentException("null 'config' arg in method call.");
+ }
+
+ name = config.getAttr(COMMAND_QUEUE_NAME);
+ if(name == null) {
+ throw new CommandQueueException("Attribute 'command-queue-name' must be specified on the command queue configuration.");
+ }
+ commandQueues.put(name, this);
+ }
+
+ /**
+ * Add a command to the in-memory command queue.
+ * <p/>
+ * Blocks until the command has been consumed.
+ * @param command The command string.
+ */
+ public void addCommand(String command) {
+ queue.add(command);
+ while(!queue.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public String receiveCommand(long timeout) throws CommandQueueException {
+ if(name == null || !commandQueues.containsKey(name)) {
+ throw new CommandQueueException("Sorry. Invalid call to 'receiveCommand' method. Queue is not open!");
+ }
+
+ try {
+ return queue.take();
+ } catch (InterruptedException e) {
+ throw new CommandQueueException("Error taking command message from command queue.", e);
+ }
+ }
+
+ public void close() throws CommandQueueException {
+ commandQueues.remove(name);
+ }
+
+ /**
+ * Get the command queue based on the name supplied in the configuration ("command-queue-name").
+ * @param name The name of the queue ala the "command-queue-name" attribute on the queue configuration.
+ * @return The MockCommandQueue instance, or null if no such queue exists.
+ */
+ public static InMemoryCommandQueue getQueue(String name) {
+ return commandQueues.get(name);
+ }
+}
Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,172 @@
+package org.jboss.internal.soa.esb.command;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * JMS based Command Queue implementation.
+ * <p/>
+ * This code was simply pulled from the GpListener.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class JmsCommandQueue implements CommandQueue {
+
+ private static Logger logger = Logger.getLogger(JmsCommandQueue.class);
+
+ public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
+
+ public static final String COMMAND_JNDI_TYPE = "commandJndiType";
+
+ public static final String COMMAND_JNDI_URL = "commandJndiURL";
+
+ public static final String COMMAND_IS_TOPIC = "commandIsTopic";
+
+ public static final String COMMAND_JNDI_NAME = "commandJndiName";
+
+ public static final String COMMAND_MSG_SELECTOR = "messageSelector";
+
+ private MessageConsumer m_oCmdSrc;
+
+ private Session m_oJmsSess;
+
+ private Connection m_oJmsConn;
+
+ public void open(DomElement config) throws CommandQueueException {
+ try {
+ initialiseJMS(config);
+ } catch (Exception e) {
+ throw new CommandQueueException("Failed to initialise JMS Command Queue.", e);
+ }
+ }
+
+ public void close() throws CommandQueueException {
+ if (null != m_oJmsSess) {
+ try {
+ m_oJmsSess.close();
+ } catch (JMSException eS) {/* Tried my best - Just continue */
+ }
+ }
+ if (null != m_oJmsConn) {
+ try {
+ m_oJmsConn.close();
+ } catch (JMSException eC) {/* Tried my best - Just continue */
+ }
+ }
+ }
+
+ public String receiveCommand(long timeout) throws CommandQueueException {
+ try {
+ Message jmsMessage = m_oCmdSrc.receive(timeout);
+
+ if (null == jmsMessage)
+ return null;
+ if (jmsMessage instanceof TextMessage) {
+ return ((TextMessage)jmsMessage).getText();
+ } else {
+ logger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+ }
+ } catch(Exception e) {
+ throw new CommandQueueException("Exception receiving message from JMS Command Queue.", e);
+ }
+
+ return null;
+ }
+
+ private void initialiseJMS(DomElement p_oP) throws Exception {
+ // Only check for JMS attributes if a queue JNDI name was specified
+ String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+ if (!Util.isNullString(sJndiName)) {
+ Map<String, Object> oNewAtts = new HashMap<String, Object>();
+
+ oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
+
+ String sJndiType = obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
+ oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
+ String sJndiURL = obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
+ oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+ sJndiURL);
+
+ String sFactClass = obtainAtt(p_oP, COMMAND_CONN_FACTORY,
+ "ConnectionFactory");
+ oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
+ if (Util.isNullString(sFactClass))
+ sFactClass = "ConnectionFactory";
+ Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+ String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+ if (null != sMsgSelector)
+ oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
+
+ boolean bIsTopic = Boolean.parseBoolean(obtainAtt(p_oP,
+ COMMAND_IS_TOPIC, "false"));
+ if (bIsTopic) {
+ TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
+ TopicConnection oTC = tcf.createTopicConnection();
+ Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+ TopicSession oSess = oTC.createTopicSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ m_oJmsConn = oTC;
+ m_oJmsSess = oSess;
+ oTC.start();
+ m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
+ } else {
+ QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
+ QueueConnection oQC = qcf.createQueueConnection();
+ javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
+ .lookup(sJndiName);
+ QueueSession oSess = oQC.createQueueSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ oQC.start();
+ m_oJmsConn = oQC;
+ m_oJmsSess = oSess;
+ m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+ }
+ }
+ }
+
+ /**
+ * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+ *
+ * @param p_oP
+ * DomElement - look for attributes in this Element only
+ * @param p_sAtt
+ * String - Name of attribute to find
+ * @param p_sDefault
+ * String -default value if requested attribute is not there
+ * @return String - value of attribute, or default value (if null)
+ * @throws Exception -
+ * If requested attribute not found and no default value
+ * supplied by invoker
+ */
+ private String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+ throws ConfigurationException {
+ String sVal = p_oP.getAttr(p_sAtt);
+ if ((null == sVal) && (null == p_sDefault))
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -29,7 +29,9 @@
*/
public class ConfigurationException extends BaseException {
- /**
+ private static final long serialVersionUID = 1L;
+
+ /**
* Construct an exception instance.
* @param message Exception message.
*/
Added: labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java 2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java 2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,124 @@
+package org.jboss.soa.esb.command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.internal.soa.esb.command.InMemoryCommandQueue;
+import org.jboss.soa.esb.helpers.DomElement;
+
+import junit.framework.TestCase;
+
+public class InMemoryCommandQueueUnitTest extends TestCase {
+
+ public void test_args() throws CommandQueueException {
+ InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+ try {
+ commandQueue.open(null);
+ fail("Expected IllegalArgumentException.");
+ } catch (IllegalArgumentException e) {
+ // OK
+ }
+
+ DomElement config = new DomElement("config");
+ try {
+ commandQueue.open(config);
+ fail("Expected CommandQueueException.");
+ } catch (CommandQueueException e) {
+ // OK
+ }
+ }
+
+ public void test_queue_open_close() throws CommandQueueException {
+ DomElement config = new DomElement("config");
+ InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+ config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+ assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+ commandQueue.open(config);
+ assertEquals(commandQueue, InMemoryCommandQueue.getQueue("test-queue"));
+ commandQueue.close();
+ assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+ }
+
+ public void test_queue_receive() throws CommandQueueException, InterruptedException {
+ DomElement config = new DomElement("config");
+ InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+ // receive should fail if the queue hasn't been opened yet...
+ try {
+ commandQueue.receiveCommand(0);
+ fail("Expected CommandQueueException.");
+ } catch (CommandQueueException e) {
+ // OK
+ }
+
+ config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+ commandQueue.open(config);
+
+ // Start the consumer thread - it will receive the commands from the queue.
+ CommandConsumerThread consumerThread = new CommandConsumerThread(commandQueue);
+ consumerThread.start();
+
+ // Make sure the thread is running.
+ assertTrue(consumerThread.isRunning);
+
+ commandQueue.addCommand("command1");
+ assertCommandReceived(consumerThread, "command1", 0);
+ commandQueue.addCommand("command2");
+ assertCommandReceived(consumerThread, "command2", 1);
+ commandQueue.addCommand("command3");
+ assertCommandReceived(consumerThread, "command3", 2);
+
+ // Stop the queue thread...
+ commandQueue.addCommand("stop");
+ Thread.sleep(50);
+ assertTrue(!consumerThread.isRunning); // this flag being reset proves the stop command was consumed and so the queue is really working
+ assertEquals(4, consumerThread.unblockCount); // Should have unblocked 4 times - once for each command.
+
+ // receive should fail if the queue has been closed...
+ commandQueue.close();
+ try {
+ commandQueue.receiveCommand(0);
+ fail("Expected CommandQueueException.");
+ } catch (CommandQueueException e) {
+ // OK
+ }
+ }
+
+ private void assertCommandReceived(CommandConsumerThread consumerThread, String expected, int index) throws InterruptedException {
+ Thread.sleep(50);
+ assertEquals("Received commands queue is not the expected length.", index + 1, consumerThread.commandsReceived.size());
+ assertEquals("Command not found as last added command.", expected, consumerThread.commandsReceived.get(index));
+ }
+
+ private class CommandConsumerThread extends Thread {
+
+ private List<String> commandsReceived = new ArrayList<String>();
+ private CommandQueue commandQueue;
+ private boolean isRunning = true;
+ private int unblockCount = 0;
+
+ private CommandConsumerThread(CommandQueue commandQueue) {
+ this.commandQueue = commandQueue;
+ }
+
+ @Override
+ public void run() {
+ String command = null;
+
+ while(!"stop".equals(command)) {
+ try {
+ command = commandQueue.receiveCommand(0);
+ commandsReceived.add(command);
+ } catch (CommandQueueException e) {
+ fail("CommandQueue Exception: " + e.getMessage());
+ }
+ unblockCount++;
+ }
+ isRunning = false;
+ }
+ }
+}
More information about the jboss-svn-commits
mailing list