[jboss-svn-commits] JBL Code SVN: r6526 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/soa/esb listeners/src/org/jboss/soa/esb/actions listeners/src/org/jboss/soa/esb/listeners listeners/tests/src/org/jboss/soa/esb listeners/tests/src/org/jboss/soa/esb/actions listeners/tests/src/org/jboss/soa/esb/listeners rosetta/src/org/jboss/internal/soa/esb rosetta/src/org/jboss/internal/soa/esb/command rosetta/src/org/jboss/soa/esb rosetta/tests/src/org/jboss/soa/esb rosetta/tests/src/org/jboss/soa/esb/command

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Oct 2 13:58:17 EDT 2006


Author: tfennelly
Date: 2006-10-02 13:57:46 -0400 (Mon, 02 Oct 2006)
New Revision: 6526

Added:
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java
   labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/
   labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
Removed:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/command/
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/command/
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml
   labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java
Log:
moved commandqueue code to rosetta internal

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ObjectToFileWriter.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -24,7 +24,6 @@
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
@@ -107,10 +106,8 @@
         synchronized(file) {
             File outputFile = getOutputFile();
             FileOutputStream fileOutputStream = null;
-            FileWriter fileWriter = null;
 
             try {
-                fileWriter = new FileWriter(outputFile, append);
                 fileOutputStream = new FileOutputStream(outputFile, append);
             } catch (IOException e) {
                 throw new ActionProcessingException("Action " + actionName + " failed.  Unable to open output file " + outputFile.getAbsolutePath());
@@ -127,7 +124,7 @@
                 throw new ActionProcessingException("Action " + actionName + " failed.  Unable to write to output file " + outputFile.getAbsolutePath(), e);
             } finally {
                 try {
-                    fileWriter.close();
+                	fileOutputStream.close();
                 } catch (IOException e) {
                     logger.warn("Exception on closing file " + file.getAbsolutePath(), e);
                 }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -33,11 +33,11 @@
 import java.util.Map;
 
 import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.internal.soa.esb.command.JmsCommandQueue;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.command.CommandQueue;
-import org.jboss.soa.esb.command.CommandQueueException;
-import org.jboss.soa.esb.command.JmsCommandQueue;
 import org.jboss.soa.esb.common.Configuration;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/actions/ObjectToFileWriterUnitTest.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -73,10 +73,10 @@
         properties.add(new KeyValuePair("file", file.getPath()));
         properties.add(new KeyValuePair("append", "true"));
 
-        // Write something fo file and check it was written..
-        writeAndCheck(A_STRING, A_STRING + A_STRING );
+        // Write something to file and check it was written..
+        writeAndCheck(A_STRING, A_STRING);
         // And do it all again to make sure the contents are appended i.e. the file is not overwritten...
-        writeAndCheck(A_STRING, A_STRING + A_STRING + A_STRING);
+        writeAndCheck(A_STRING, A_STRING + A_STRING);
     }
 
 

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListener-Config-01.xml	2006-10-02 17:57:46 UTC (rev 6526)
@@ -1,5 +1,5 @@
 <EsbConfig
-	command-queue-class="org.jboss.soa.esb.command.InMemoryCommandQueue"
+	command-queue-class="org.jboss.internal.soa.esb.command.InMemoryCommandQueue"
 	command-queue-name="test-queue"
 >
 	

Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/GpListenerUnitTest.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -23,8 +23,8 @@
 
 import java.util.Date;
 
+import org.jboss.internal.soa.esb.command.InMemoryCommandQueue;
 import org.jboss.soa.esb.actions.ToNowhereRouter;
-import org.jboss.soa.esb.command.InMemoryCommandQueue;
 import org.jboss.soa.esb.common.tests.BaseTest;
 import org.jboss.soa.esb.helpers.DomElement;
 import org.jboss.soa.esb.util.ListenersManagerExecThread;

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueue.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,34 @@
+package org.jboss.internal.soa.esb.command;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Command queue abstraction.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public interface CommandQueue {
+
+	/**
+	 * Open the command queue.
+	 * @param config Command queue configuration.
+	 * @throws CommandQueueException Queue exception.  Check for probable chained cause exceptions.
+	 */
+	public void open(DomElement config) throws CommandQueueException;
+	
+	/**
+	 * Receive a message from the underlying queue implementation.
+	 * <p/>
+	 * Performs a blocking receive on the command queue, controled by the receive timeout.
+	 * @param timeout The receive block timeout.  Zero to block indefinitely.
+	 * @return The command message from the queue.
+	 * @throws CommandQueueException Queue exception.  Check for probable chained cause exceptions.
+	 */
+	public String receiveCommand(long timeout) throws CommandQueueException;
+	
+	/**
+	 * Close the command queue.
+	 * @throws CommandQueueException Queue exception.  Check for probable chained cause exceptions.
+	 */
+	public void close() throws CommandQueueException;
+}

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/CommandQueueException.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,29 @@
+package org.jboss.internal.soa.esb.command;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Command queue exception.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class CommandQueueException extends BaseException {
+
+	private static final long serialVersionUID = 1L;
+
+	public CommandQueueException() {
+		super();
+	}
+
+	public CommandQueueException(String message) {
+		super(message);
+	}
+
+	public CommandQueueException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CommandQueueException(Throwable cause) {
+		super(cause);
+	}
+}

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/InMemoryCommandQueue.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,86 @@
+package org.jboss.internal.soa.esb.command;
+
+import java.util.Hashtable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * In Memory Blocking Command Queue.
+ * <p/>
+ * Suitable for testing or any other purpose.
+ * <p/>
+ * The command queue's configuration needs to specify the
+ * queue name via a "command-queue-name" attribute supplied in the configuration to the
+ * {@link #open(DomElement)} method.  The queues are stored statically and can be accessed via the
+ * {@link #getQueue(String)} method using the queue name.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class InMemoryCommandQueue implements CommandQueue {
+
+	/**
+	 * Command queue name attribute name.
+	 */
+	public static final String COMMAND_QUEUE_NAME = "command-queue-name";
+
+	private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
+	
+	private String name;
+	private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+	
+	public void open(DomElement config) throws CommandQueueException {
+		if(config == null) {
+			throw new IllegalArgumentException("null 'config' arg in method call.");
+		}
+		
+		name = config.getAttr(COMMAND_QUEUE_NAME);
+		if(name == null) {
+			throw new CommandQueueException("Attribute 'command-queue-name' must be specified on the command queue configuration.");
+		}
+		commandQueues.put(name, this);
+	}
+
+	/**
+	 * Add a command to the in-memory command queue.
+     * <p/>
+     * Blocks until the command has been consumed. 
+	 * @param command The command string.
+	 */
+	public void addCommand(String command) {
+		queue.add(command);
+        while(!queue.isEmpty()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+	}
+	
+	public String receiveCommand(long timeout) throws CommandQueueException {
+		if(name == null || !commandQueues.containsKey(name)) {
+			throw new CommandQueueException("Sorry.  Invalid call to 'receiveCommand' method.  Queue is not open!");
+		}
+		
+		try {
+			return queue.take();
+		} catch (InterruptedException e) {
+			throw new CommandQueueException("Error taking command message from command queue.", e);
+		}
+	}
+
+	public void close() throws CommandQueueException {
+		commandQueues.remove(name);
+	}
+	
+	/**
+	 * Get the command queue based on the name supplied in the configuration ("command-queue-name"). 
+	 * @param name The name of the queue ala the "command-queue-name" attribute on the queue configuration.
+	 * @return The MockCommandQueue instance, or null if no such queue exists.
+	 */
+	public static InMemoryCommandQueue getQueue(String name) {
+		return commandQueues.get(name);
+	}
+}

Added: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/command/JmsCommandQueue.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,172 @@
+package org.jboss.internal.soa.esb.command;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * JMS based Command Queue implementation.
+ * <p/>
+ * This code was simply pulled from the GpListener.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class JmsCommandQueue implements CommandQueue {
+
+	private static Logger logger = Logger.getLogger(JmsCommandQueue.class);
+
+	public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
+
+	public static final String COMMAND_JNDI_TYPE = "commandJndiType";
+
+	public static final String COMMAND_JNDI_URL = "commandJndiURL";
+
+	public static final String COMMAND_IS_TOPIC = "commandIsTopic";
+
+	public static final String COMMAND_JNDI_NAME = "commandJndiName";
+
+	public static final String COMMAND_MSG_SELECTOR = "messageSelector";
+	
+	private MessageConsumer m_oCmdSrc;
+
+	private Session m_oJmsSess;
+
+	private Connection m_oJmsConn;
+	
+	public void open(DomElement config) throws CommandQueueException {
+		try {
+			initialiseJMS(config);
+		} catch (Exception e) {
+			throw new CommandQueueException("Failed to initialise JMS Command Queue.", e);
+		}
+	}
+
+	public void close() throws CommandQueueException {
+		if (null != m_oJmsSess) {
+			try {
+				m_oJmsSess.close();
+			} catch (JMSException eS) {/* Tried my best - Just continue */
+			}
+		}
+		if (null != m_oJmsConn) {
+			try {
+				m_oJmsConn.close();
+			} catch (JMSException eC) {/* Tried my best - Just continue */
+			}
+		}
+	}
+
+	public String receiveCommand(long timeout) throws CommandQueueException {
+		try {
+			Message jmsMessage = m_oCmdSrc.receive(timeout);
+			
+			if (null == jmsMessage)
+				return null;
+			if (jmsMessage instanceof TextMessage) {
+				return ((TextMessage)jmsMessage).getText();
+			} else {
+				logger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+			}
+		} catch(Exception e) {
+			throw new CommandQueueException("Exception receiving message from JMS Command Queue.", e);
+		}
+		
+		return null;
+	}
+
+	private void initialiseJMS(DomElement p_oP) throws Exception {
+		// Only check for JMS attributes if a queue JNDI name was specified
+		String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+		if (!Util.isNullString(sJndiName)) {
+			Map<String, Object> oNewAtts = new HashMap<String, Object>();
+
+			oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
+
+			String sJndiType = obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
+			oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
+			String sJndiURL = obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
+			oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
+			Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+					sJndiURL);
+
+			String sFactClass = obtainAtt(p_oP, COMMAND_CONN_FACTORY,
+					"ConnectionFactory");
+			oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
+			if (Util.isNullString(sFactClass))
+				sFactClass = "ConnectionFactory";
+			Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+			String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+			if (null != sMsgSelector)
+				oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
+
+			boolean bIsTopic = Boolean.parseBoolean(obtainAtt(p_oP,
+					COMMAND_IS_TOPIC, "false"));
+			if (bIsTopic) {
+				TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
+				TopicConnection oTC = tcf.createTopicConnection();
+				Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+				TopicSession oSess = oTC.createTopicSession(false,
+						TopicSession.AUTO_ACKNOWLEDGE);
+				m_oJmsConn = oTC;
+				m_oJmsSess = oSess;
+				oTC.start();
+				m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
+			} else {
+				QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
+				QueueConnection oQC = qcf.createQueueConnection();
+				javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
+						.lookup(sJndiName);
+				QueueSession oSess = oQC.createQueueSession(false,
+						TopicSession.AUTO_ACKNOWLEDGE);
+				oQC.start();
+				m_oJmsConn = oQC;
+				m_oJmsSess = oSess;
+				m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+			}
+		}
+	}
+
+	/**
+	 * Find an attribute in the tree (arg 0) or assign default value (arg 2)
+	 * 
+	 * @param p_oP
+	 *            DomElement - look for attributes in this Element only
+	 * @param p_sAtt
+	 *            String - Name of attribute to find
+	 * @param p_sDefault
+	 *            String -default value if requested attribute is not there
+	 * @return String - value of attribute, or default value (if null)
+	 * @throws Exception -
+	 *             If requested attribute not found and no default value
+	 *             supplied by invoker
+	 */
+	private String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+			throws ConfigurationException {
+		String sVal = p_oP.getAttr(p_sAtt);
+		if ((null == sVal) && (null == p_sDefault))
+			throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
+
+		return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+}

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/ConfigurationException.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -29,7 +29,9 @@
  */
 public class ConfigurationException extends BaseException {
 
-    /**
+	private static final long serialVersionUID = 1L;
+
+	/**
      * Construct an exception instance. 
      * @param message Exception message.
      */

Added: labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java	2006-10-02 16:54:15 UTC (rev 6525)
+++ labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/soa/esb/command/InMemoryCommandQueueUnitTest.java	2006-10-02 17:57:46 UTC (rev 6526)
@@ -0,0 +1,124 @@
+package org.jboss.soa.esb.command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.internal.soa.esb.command.CommandQueue;
+import org.jboss.internal.soa.esb.command.CommandQueueException;
+import org.jboss.internal.soa.esb.command.InMemoryCommandQueue;
+import org.jboss.soa.esb.helpers.DomElement;
+
+import junit.framework.TestCase;
+
+public class InMemoryCommandQueueUnitTest extends TestCase {
+
+	public void test_args() throws CommandQueueException {
+		InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+		
+		try {
+			commandQueue.open(null);
+			fail("Expected IllegalArgumentException.");
+		} catch (IllegalArgumentException e) {
+			// OK
+		}
+
+		DomElement config = new DomElement("config");
+		try {
+			commandQueue.open(config);
+			fail("Expected CommandQueueException.");
+		} catch (CommandQueueException e) {
+			// OK
+		}
+	}
+	
+	public void test_queue_open_close() throws CommandQueueException {
+		DomElement config = new DomElement("config");
+		InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+		config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+		assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+		commandQueue.open(config);
+		assertEquals(commandQueue, InMemoryCommandQueue.getQueue("test-queue"));
+		commandQueue.close();
+		assertEquals(null, InMemoryCommandQueue.getQueue("test-queue"));
+	}
+	
+	public void test_queue_receive() throws CommandQueueException, InterruptedException {
+		DomElement config = new DomElement("config");
+		InMemoryCommandQueue commandQueue = new InMemoryCommandQueue();
+
+		// receive should fail if the queue hasn't been opened yet...
+		try {
+			commandQueue.receiveCommand(0);
+			fail("Expected CommandQueueException.");
+		} catch (CommandQueueException e) {
+			// OK
+		}
+		
+		config.setAttr(InMemoryCommandQueue.COMMAND_QUEUE_NAME, "test-queue");
+		commandQueue.open(config);
+		
+		// Start the consumer thread - it will receive the commands from the queue.
+		CommandConsumerThread consumerThread = new CommandConsumerThread(commandQueue);
+		consumerThread.start();
+		
+		// Make sure the thread is running.
+		assertTrue(consumerThread.isRunning);
+		
+		commandQueue.addCommand("command1");
+		assertCommandReceived(consumerThread, "command1", 0);
+		commandQueue.addCommand("command2");
+		assertCommandReceived(consumerThread, "command2", 1);
+		commandQueue.addCommand("command3");
+		assertCommandReceived(consumerThread, "command3", 2);
+		
+		// Stop the queue thread...
+		commandQueue.addCommand("stop");
+		Thread.sleep(50);
+		assertTrue(!consumerThread.isRunning);  // this flag being reset proves the stop command was consumed and so the queue is really working
+		assertEquals(4, consumerThread.unblockCount); // Should have unblocked 4 times - once for each command.
+		
+		// receive should fail if the queue has been closed...
+		commandQueue.close();
+		try {
+			commandQueue.receiveCommand(0);
+			fail("Expected CommandQueueException.");
+		} catch (CommandQueueException e) {
+			// OK
+		}
+	}
+	
+	private void assertCommandReceived(CommandConsumerThread consumerThread, String expected, int index) throws InterruptedException {
+		Thread.sleep(50);
+		assertEquals("Received commands queue is not the expected length.", index + 1, consumerThread.commandsReceived.size());
+		assertEquals("Command not found as last added command.", expected, consumerThread.commandsReceived.get(index));
+	}
+
+	private class CommandConsumerThread extends Thread {
+
+		private List<String> commandsReceived = new ArrayList<String>();
+		private CommandQueue commandQueue;
+		private boolean isRunning = true;
+		private int unblockCount = 0;
+		
+		private CommandConsumerThread(CommandQueue commandQueue) {
+			this.commandQueue = commandQueue;
+		}
+		
+		@Override
+		public void run() {
+			String command = null;
+			
+			while(!"stop".equals(command)) {
+				try {
+					command = commandQueue.receiveCommand(0);
+					commandsReceived.add(command);
+				} catch (CommandQueueException e) {
+					fail("CommandQueue Exception: " + e.getMessage());
+				}
+				unblockCount++;
+			}
+			isRunning = false;
+		}		
+	}
+}




More information about the jboss-svn-commits mailing list