[jboss-cvs] JBoss Messaging SVN: r6093 - in trunk: tests/src/org/jboss/messaging/tests/integration/management and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 16 10:24:05 EDT 2009
Author: jmesnil
Date: 2009-03-16 10:24:04 -0400 (Mon, 16 Mar 2009)
New Revision: 6093
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/core/
trunk/tests/src/org/jboss/messaging/tests/integration/management/core/CoreQueueControlTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/management/DayCounterInfo.java
trunk/src/main/org/jboss/messaging/core/management/MessageCounterInfo.java
trunk/src/main/org/jboss/messaging/core/management/MessageInfo.java
Log:
QueueControl integration tests
* completed QueueControlTest which uses a JMX Bean to manage the queue
* added subclass CoreQueueControlTest which uses core messaging to manage the queue
Modified: trunk/src/main/org/jboss/messaging/core/management/DayCounterInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/DayCounterInfo.java 2009-03-16 13:37:29 UTC (rev 6092)
+++ trunk/src/main/org/jboss/messaging/core/management/DayCounterInfo.java 2009-03-16 14:24:04 UTC (rev 6093)
@@ -22,6 +22,10 @@
package org.jboss.messaging.core.management;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.jboss.messaging.core.logging.Logger;
import static javax.management.openmbean.SimpleType.INTEGER;
@@ -123,6 +127,25 @@
}
return data;
}
+
+ public static DayCounterInfo[] from(TabularData data)
+ {
+ Collection values = data.values();
+ List<DayCounterInfo> infos = new ArrayList<DayCounterInfo>();
+ for (Object object : values)
+ {
+ CompositeData compositeData = (CompositeData)object;
+ String date = (String)compositeData.get("date");
+ int[] counters = new int[24];
+ for (int i = 0; i < counters.length; i++)
+ {
+ counters[i] = (Integer)compositeData.get(String.format("%02d", i));
+ }
+ infos.add(new DayCounterInfo(date, counters));
+ }
+
+ return (DayCounterInfo[])infos.toArray(new DayCounterInfo[infos.size()]);
+ }
// Constructors --------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/MessageCounterInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessageCounterInfo.java 2009-03-16 13:37:29 UTC (rev 6092)
+++ trunk/src/main/org/jboss/messaging/core/management/MessageCounterInfo.java 2009-03-16 14:24:04 UTC (rev 6093)
@@ -34,7 +34,6 @@
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
-import javax.management.openmbean.TabularType;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.messagecounter.MessageCounter;
@@ -51,35 +50,46 @@
private static final Logger log = Logger.getLogger(MessageCounterInfo.class);
- private static final DateFormat DATE_FORMAT = DateFormat.getDateTimeInstance(DateFormat.SHORT,
- DateFormat.MEDIUM);
-
+ private static final DateFormat DATE_FORMAT = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+
public static final CompositeType TYPE;
+
private static final String MESSAGE_TYPE_NAME = "MessageCounterInfo";
- private static final String MESSAGE_TABULAR_TYPE_NAME = "TabularMessageCounterInfo";
+
private static final String[] ITEM_NAMES = new String[] { "name",
- "subscription", "durable", "count", "countDelta", "depth",
- "depthDelta", "lastAddTimestamp", "updateTimestamp" };
- private static final String[] ITEM_DESCRIPTIONS = new String[] {
- "Name of the Queue", "Name of the subscription",
- "Is the queue durable?", "Message count", "Message count delta",
- "Depth", "Depth delta", "Timestamp of the last added messagg", "Timestamp of the last update" };
+ "subscription",
+ "durable",
+ "count",
+ "countDelta",
+ "depth",
+ "depthDelta",
+ "lastAddTimestamp",
+ "updateTimestamp" };
+
+ private static final String[] ITEM_DESCRIPTIONS = new String[] { "Name of the Queue",
+ "Name of the subscription",
+ "Is the queue durable?",
+ "Message count",
+ "Message count delta",
+ "Depth",
+ "Depth delta",
+ "Timestamp of the last added messagg",
+ "Timestamp of the last update" };
+
private static final OpenType[] TYPES;
- private static final TabularType TABULAR_TYPE;
static
{
try
{
- TYPES = new OpenType[] { STRING, STRING, BOOLEAN, INTEGER, INTEGER,
- INTEGER, INTEGER, STRING, STRING };
+ TYPES = new OpenType[] { STRING, STRING, BOOLEAN, INTEGER, INTEGER, INTEGER, INTEGER, STRING, STRING };
TYPE = new CompositeType(MESSAGE_TYPE_NAME,
- "Information for a MessageCounter", ITEM_NAMES,
- ITEM_DESCRIPTIONS, TYPES);
- TABULAR_TYPE = new TabularType(MESSAGE_TABULAR_TYPE_NAME,
- "Information for Tabular MessageCounter", TYPE,
- new String[] { "name" });
- } catch (OpenDataException e)
+ "Information for a MessageCounter",
+ ITEM_NAMES,
+ ITEM_DESCRIPTIONS,
+ TYPES);
+ }
+ catch (OpenDataException e)
{
log.error("Unable to create open types for a MessageCounter", e);
throw new IllegalStateException(e);
@@ -89,13 +99,21 @@
// Attributes ----------------------------------------------------
private final String name;
+
private final String subscription;
+
private final boolean durable;
+
private final int count;
+
private final int countDelta;
+
private final int depth;
+
private final int depthDelta;
+
private final String lastAddTimestamp;
+
private final String udpateTimestamp;
// Static --------------------------------------------------------
@@ -104,19 +122,52 @@
{
String lassAddTimestamp = DATE_FORMAT.format(new Date(counter.getLastAddedMessageTime()));
String updateTimestamp = DATE_FORMAT.format(new Date(counter.getLastUpdate()));
- MessageCounterInfo info = new MessageCounterInfo(counter
- .getDestinationName(), counter.getDestinationSubscription(),
- counter.isDestinationDurable(), counter.getCount(), counter
- .getCountDelta(), counter.getMessageCount(), counter
- .getMessageCountDelta(), lassAddTimestamp, updateTimestamp);
+ MessageCounterInfo info = new MessageCounterInfo(counter.getDestinationName(),
+ counter.getDestinationSubscription(),
+ counter.isDestinationDurable(),
+ counter.getCount(),
+ counter.getCountDelta(),
+ counter.getMessageCount(),
+ counter.getMessageCountDelta(),
+ lassAddTimestamp,
+ updateTimestamp);
return info.toCompositeData();
}
-
+
+ public static MessageCounterInfo from(CompositeData data)
+ {
+ String name = (String)data.get("name");
+ String subscription = (String)data.get("subscription");
+ boolean durable = (Boolean)data.get("durable");
+ int count = (Integer)data.get("count");
+ int countDelta = (Integer)data.get("countDelta");
+ int depth = (Integer)data.get("depth");
+ int depthDelta = (Integer)data.get("depthDelta");
+ String lastAddTimestamp = (String)data.get("lastAddTimestamp");
+ String updateTimestamp = (String)data.get("updateTimestamp");
+
+ return new MessageCounterInfo(name,
+ subscription,
+ durable,
+ count,
+ countDelta,
+ depth,
+ depthDelta,
+ lastAddTimestamp,
+ updateTimestamp);
+ }
+
// Constructors --------------------------------------------------
- public MessageCounterInfo(final String name, final String subscription,
- final boolean durable, final int count, final int countDelta,
- final int depth, final int depthDelta, final String lastAddTimestamp, final String udpateTimestamp)
+ public MessageCounterInfo(final String name,
+ final String subscription,
+ final boolean durable,
+ final int count,
+ final int countDelta,
+ final int depth,
+ final int depthDelta,
+ final String lastAddTimestamp,
+ final String udpateTimestamp)
{
this.name = name;
this.subscription = subscription;
@@ -137,15 +188,66 @@
{
return new CompositeDataSupport(TYPE, ITEM_NAMES, new Object[] { name,
- subscription, durable, count, countDelta, depth, depthDelta,
- lastAddTimestamp, udpateTimestamp });
- } catch (OpenDataException e)
+ subscription,
+ durable,
+ count,
+ countDelta,
+ depth,
+ depthDelta,
+ lastAddTimestamp,
+ udpateTimestamp });
+ }
+ catch (OpenDataException e)
{
log.error("Unable to create a CompositeData from a MessageCounter", e);
return null;
}
}
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getSubscription()
+ {
+ return subscription;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public int getCount()
+ {
+ return count;
+ }
+
+ public int getCountDelta()
+ {
+ return countDelta;
+ }
+
+ public int getDepth()
+ {
+ return depth;
+ }
+
+ public int getDepthDelta()
+ {
+ return depthDelta;
+ }
+
+ public String getLastAddTimestamp()
+ {
+ return lastAddTimestamp;
+ }
+
+ public String getUdpateTimestamp()
+ {
+ return udpateTimestamp;
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/MessageInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessageInfo.java 2009-03-16 13:37:29 UTC (rev 6092)
+++ trunk/src/main/org/jboss/messaging/core/management/MessageInfo.java 2009-03-16 14:24:04 UTC (rev 6093)
@@ -53,33 +53,58 @@
// Constants -----------------------------------------------------
public static final CompositeType TYPE;
+
private static final String MESSAGE_TYPE_NAME = "MessageInfo";
+
private static final String MESSAGE_TABULAR_TYPE_NAME = "MessageTabularInfo";
+
private static final String[] ITEM_NAMES = new String[] { "id",
- "destination", "durable", "timestamp", "type", "size", "priority",
- "expired", "expiration", "properties" };
- private static final String[] ITEM_DESCRIPTIONS = new String[] {
- "Message ID", "destination of the message", "Is the message durable?",
- "Timestamp of the message", "Type of the message",
- "Size of the encoded messag", "Priority of the message",
- "Is the message expired?", "Expiration of the message",
- "Properties of the message" };
+ "destination",
+ "durable",
+ "timestamp",
+ "type",
+ "size",
+ "priority",
+ "expired",
+ "expiration",
+ "properties" };
+
+ private static final String[] ITEM_DESCRIPTIONS = new String[] { "Message ID",
+ "destination of the message",
+ "Is the message durable?",
+ "Timestamp of the message",
+ "Type of the message",
+ "Size of the encoded messag",
+ "Priority of the message",
+ "Is the message expired?",
+ "Expiration of the message",
+ "Properties of the message" };
+
private static final OpenType[] TYPES;
+
private static final TabularType TABULAR_TYPE;
static
{
try
{
- TYPES = new OpenType[] { LONG, STRING, BOOLEAN, LONG, BYTE, INTEGER,
- BYTE, BOOLEAN, LONG, PropertiesInfo.TABULAR_TYPE };
- TYPE = new CompositeType(MESSAGE_TYPE_NAME,
- "Information for a Message", ITEM_NAMES, ITEM_DESCRIPTIONS,
- TYPES);
+ TYPES = new OpenType[] { LONG,
+ STRING,
+ BOOLEAN,
+ LONG,
+ BYTE,
+ INTEGER,
+ BYTE,
+ BOOLEAN,
+ LONG,
+ PropertiesInfo.TABULAR_TYPE };
+ TYPE = new CompositeType(MESSAGE_TYPE_NAME, "Information for a Message", ITEM_NAMES, ITEM_DESCRIPTIONS, TYPES);
TABULAR_TYPE = new TabularType(MESSAGE_TABULAR_TYPE_NAME,
- "Information for tabular MessageInfo", TYPE,
- new String[] { "id" });
- } catch (OpenDataException e)
+ "Information for tabular MessageInfo",
+ TYPE,
+ new String[] { "id" });
+ }
+ catch (OpenDataException e)
{
e.printStackTrace();
throw new IllegalStateException(e);
@@ -89,20 +114,28 @@
// Attributes ----------------------------------------------------
private final long id;
+
private final String destination;
+
private final boolean durable;
+
private final long timestamp;
+
private final byte type;
+
private final int size;
+
private final byte priority;
+
private final boolean expired;
+
private final long expiration;
+
private PropertiesInfo properties;
// Static --------------------------------------------------------
- public static TabularData toTabularData(final MessageInfo[] infos)
- throws OpenDataException
+ public static TabularData toTabularData(final MessageInfo[] infos) throws OpenDataException
{
TabularData data = new TabularDataSupport(TABULAR_TYPE);
for (MessageInfo messageInfo : infos)
@@ -118,30 +151,36 @@
List<MessageInfo> infos = new ArrayList<MessageInfo>();
for (Object object : values)
{
- CompositeData compositeData = (CompositeData) object;
- long id = (Long) compositeData.get("id");
- String destination = (String) compositeData.get("destination");
- boolean durable = (Boolean) compositeData.get("durable");
- long timestamp = (Long) compositeData.get("timestamp");
- byte type = (Byte) compositeData.get("type");
- int size = (Integer) compositeData.get("size");
- byte priority = (Byte) compositeData.get("priority");
- boolean expired = (Boolean) compositeData.get("expired");
- long expiration = (Long) compositeData.get("expiration");
+ CompositeData compositeData = (CompositeData)object;
+ long id = (Long)compositeData.get("id");
+ String destination = (String)compositeData.get("destination");
+ boolean durable = (Boolean)compositeData.get("durable");
+ long timestamp = (Long)compositeData.get("timestamp");
+ byte type = (Byte)compositeData.get("type");
+ int size = (Integer)compositeData.get("size");
+ byte priority = (Byte)compositeData.get("priority");
+ boolean expired = (Boolean)compositeData.get("expired");
+ long expiration = (Long)compositeData.get("expiration");
- infos.add(new MessageInfo(id, destination, durable, timestamp, type,
- size, priority, expired, expiration));
+ TabularData properties = (TabularData)compositeData.get("properties");
+ PropertiesInfo propertiesInfo = PropertiesInfo.from(properties);
+ infos.add(new MessageInfo(id, destination, durable, timestamp, type, size, priority, expired, expiration, propertiesInfo));
}
- return (MessageInfo[]) infos.toArray(new MessageInfo[infos.size()]);
+ return (MessageInfo[])infos.toArray(new MessageInfo[infos.size()]);
}
// Constructors --------------------------------------------------
- public MessageInfo(final long id, final String destination,
- final boolean durable, final long timestamp, final byte type,
- final int size, final byte priority, final boolean expired,
- final long expiration)
+ public MessageInfo(final long id,
+ final String destination,
+ final boolean durable,
+ final long timestamp,
+ final byte type,
+ final int size,
+ final byte priority,
+ final boolean expired,
+ final long expiration)
{
this.id = id;
this.destination = destination;
@@ -155,6 +194,29 @@
this.properties = new PropertiesInfo();
}
+ public MessageInfo(final long id,
+ final String destination,
+ final boolean durable,
+ final long timestamp,
+ final byte type,
+ final int size,
+ final byte priority,
+ final boolean expired,
+ final long expiration,
+ final PropertiesInfo properties)
+ {
+ this.id = id;
+ this.destination = destination;
+ this.durable = durable;
+ this.timestamp = timestamp;
+ this.type = type;
+ this.size = size;
+ this.priority = priority;
+ this.expired = expired;
+ this.expiration = expiration;
+ this.properties = properties;
+ }
+
// Public --------------------------------------------------------
public long getID()
@@ -218,9 +280,17 @@
{
return new CompositeDataSupport(TYPE, ITEM_NAMES, new Object[] { id,
- destination, durable, timestamp, type, size, priority, expired,
- expiration, properties.toTabularData() });
- } catch (OpenDataException e)
+ destination,
+ durable,
+ timestamp,
+ type,
+ size,
+ priority,
+ expired,
+ expiration,
+ properties.toTabularData() });
+ }
+ catch (OpenDataException e)
{
e.printStackTrace();
return null;
@@ -230,9 +300,23 @@
@Override
public String toString()
{
- return "MessageInfo[id=" + id + ", destination=" + destination
- + ", durable=" + durable + ", timestamp=" + timestamp + ", type="
- + type + ", size=" + size + ", priority=" + priority + ", expired="
- + expired + ", expiration=" + expiration + "]";
+ return "MessageInfo[id=" + id +
+ ", destination=" +
+ destination +
+ ", durable=" +
+ durable +
+ ", timestamp=" +
+ timestamp +
+ ", type=" +
+ type +
+ ", size=" +
+ size +
+ ", priority=" +
+ priority +
+ ", expired=" +
+ expired +
+ ", expiration=" +
+ expiration +
+ "]";
}
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java 2009-03-16 13:37:29 UTC (rev 6092)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java 2009-03-16 14:24:04 UTC (rev 6093)
@@ -1,796 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.management;
-
-import static org.jboss.messaging.tests.integration.management.ManagementControlHelper.createQueueControl;
-import static org.jboss.messaging.tests.util.RandomUtil.randomBoolean;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
-import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.management.MessageInfo;
-import org.jboss.messaging.core.management.QueueControlMBean;
-import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
-import org.jboss.messaging.core.server.Messaging;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A QueueControlTest
- *
- * @author jmesnil
- *
- * Created 26 nov. 2008 14:18:48
- *
- *
- */
-public class QueueControlTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private MessagingService service;
-
- private MBeanServer mbeanServer;
-
- private ClientSession session;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testAttributes() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString filter = new SimpleString("color = 'blue'");
- boolean durable = randomBoolean();
- boolean temporary = false;
-
- session.createQueue(address, queue, filter, durable, temporary);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(queue.toString(), queueControl.getName());
- assertEquals(address.toString(), queueControl.getAddress());
- assertEquals(filter.toString(), queueControl.getFilter());
- assertEquals(durable, queueControl.isDurable());
- assertEquals(temporary, queueControl.isTemporary());
-
- session.deleteQueue(queue);
- }
-
- public void testGetNullFilter() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, false);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(queue.toString(), queueControl.getName());
- assertEquals(null, queueControl.getFilter());
-
- session.deleteQueue(queue);
- }
-
- public void testGetDeadLetterAddress() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- final SimpleString deadLetterAddress = randomSimpleString();
-
- session.createQueue(address, queue, null, false, false);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertNull(queueControl.getDeadLetterAddress());
-
- service.getServer().getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings()
- {
- @Override
- public SimpleString getDeadLetterAddress()
- {
- return deadLetterAddress;
- }
- });
-
- assertEquals(deadLetterAddress.toString(), queueControl.getDeadLetterAddress());
-
- session.deleteQueue(queue);
- }
-
- public void testSetDeadLetterAddress() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- String deadLetterAddress = randomString();
-
- session.createQueue(address, queue, null, false, false);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- queueControl.setDeadLetterAddress(deadLetterAddress);
-
- assertEquals(deadLetterAddress, queueControl.getDeadLetterAddress());
-
- session.deleteQueue(queue);
- }
-
- public void testGetExpiryAddress() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- final SimpleString expiryAddress = randomSimpleString();
-
- session.createQueue(address, queue, null, false, false);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertNull(queueControl.getExpiryAddress());
-
- service.getServer().getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings()
- {
- @Override
- public SimpleString getExpiryAddress()
- {
- return expiryAddress;
- }
- });
-
- assertEquals(expiryAddress.toString(), queueControl.getExpiryAddress());
-
- session.deleteQueue(queue);
- }
-
- public void testSetExpiryAddress() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- String expiryAddress = randomString();
-
- session.createQueue(address, queue, null, false, false);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- queueControl.setExpiryAddress(expiryAddress);
-
- assertEquals(expiryAddress, queueControl.getExpiryAddress());
-
- session.deleteQueue(queue);
- }
-
- /**
- * <ol>
- * <li>send a message to queue</li>
- * <li>move all messages from queue to otherQueue using management method</li>
- * <li>check there is no message to consume from queue</li>
- * <li>consume the message from otherQueue</li>
- * </ol>
- */
- public void testMoveAllMessages() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString otherAddress = randomSimpleString();
- SimpleString otherQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- session.createQueue(otherAddress, otherQueue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- ClientMessage message = session.createClientMessage(false);
- SimpleString key = randomSimpleString();
- long value = randomLong();
- message.putLongProperty(key, value);
- producer.send(message);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(1, queueControl.getMessageCount());
-
- // moved all messages to otherQueue
- int movedMessagesCount = queueControl.moveAllMessages(otherQueue.toString());
- assertEquals(1, movedMessagesCount);
- assertEquals(0, queueControl.getMessageCount());
-
- // check there is no message to consume from queue
- consumeMessages(0, session, queue);
-
- // consume the message from otherQueue
- ClientConsumer otherConsumer = session.createConsumer(otherQueue);
- ClientMessage m = otherConsumer.receive(500);
- assertEquals(value, m.getProperty(key));
-
- m.acknowledge();
-
- session.deleteQueue(queue);
- otherConsumer.close();
- session.deleteQueue(otherQueue);
- }
-
- public void testMoveAllMessagesToUnknownQueue() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString unknownQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- ClientMessage message = session.createClientMessage(false);
- SimpleString key = randomSimpleString();
- long value = randomLong();
- message.putLongProperty(key, value);
- producer.send(message);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(1, queueControl.getMessageCount());
-
- // moved all messages to unknown queue
- try
- {
- queueControl.moveAllMessages(unknownQueue.toString());
- fail("operation must fail if the other queue does not exist");
- }
- catch (Exception e)
- {
- }
- assertEquals(1, queueControl.getMessageCount());
-
- consumeMessages(1, session, queue);
-
- session.deleteQueue(queue);
- }
-
- /**
- * <ol>
- * <li>send 2 message to queue</li>
- * <li>move messages from queue to otherQueue using management method <em>with filter</em></li>
- * <li>consume the message which <strong>did not</strong> matches the filter from queue</li>
- * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li>
- * </ol>
- */
-
- public void testMoveMatchingMessages() throws Exception
- {
- SimpleString key = new SimpleString("key");
- long matchingValue = randomLong();
- long unmatchingValue = matchingValue + 1;
-
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString otherAddress = randomSimpleString();
- SimpleString otherQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- session.createQueue(otherAddress, otherQueue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- ClientMessage matchingMessage = session.createClientMessage(false);
- matchingMessage.putLongProperty(key, matchingValue);
- producer.send(matchingMessage);
- ClientMessage unmatchingMessage = session.createClientMessage(false);
- unmatchingMessage.putLongProperty(key, unmatchingValue);
- producer.send(unmatchingMessage);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
-
- // moved matching messages to otherQueue
- int movedMatchedMessagesCount = queueControl.moveMatchingMessages(key + " =" + matchingValue, otherQueue.toString());
- assertEquals(1, movedMatchedMessagesCount);
- assertEquals(1, queueControl.getMessageCount());
-
- // consume the unmatched message from queue
- ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage m = consumer.receive(500);
- assertNotNull(m);
- assertEquals(unmatchingValue, m.getProperty(key));
-
- // consume the matched message from otherQueue
- ClientConsumer otherConsumer = session.createConsumer(otherQueue);
- m = otherConsumer.receive(500);
- assertNotNull(m);
- assertEquals(matchingValue, m.getProperty(key));
-
- m.acknowledge();
-
- consumer.close();
- session.deleteQueue(queue);
- otherConsumer.close();
- session.deleteQueue(otherQueue);
- }
-
- public void testMoveMessage() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString otherAddress = randomSimpleString();
- SimpleString otherQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- session.createQueue(otherAddress, otherQueue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send 2 messages on queue
- producer.send(session.createClientMessage(false));
- producer.send(session.createClientMessage(false));
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- QueueControlMBean otherQueueControl = createQueueControl(otherAddress, otherQueue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
- assertEquals(0, otherQueueControl.getMessageCount());
-
- // the message IDs are set on the server
- MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
- assertEquals(2, messageInfos.length);
- long messageID = messageInfos[0].getID();
-
- boolean moved = queueControl.moveMessage(messageID, otherQueue.toString());
- assertTrue(moved);
- assertEquals(1, queueControl.getMessageCount());
- assertEquals(1, otherQueueControl.getMessageCount());
-
- consumeMessages(1, session, queue);
- consumeMessages(1, session, otherQueue);
-
- session.deleteQueue(queue);
- session.deleteQueue(otherQueue);
- }
-
- public void testMoveMessageToUnknownQueue() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString unknownQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send 2 messages on queue
- producer.send(session.createClientMessage(false));
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(1, queueControl.getMessageCount());
-
- // the message IDs are set on the server
- MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
- assertEquals(1, messageInfos.length);
- long messageID = messageInfos[0].getID();
-
-
- // moved all messages to unknown queue
- try
- {
- queueControl.moveMessage(messageID, unknownQueue.toString());
- fail("operation must fail if the other queue does not exist");
- }
- catch (Exception e)
- {
- }
- assertEquals(1, queueControl.getMessageCount());
-
- consumeMessages(1, session, queue);
-
- session.deleteQueue(queue);
- }
-
- /**
- * <ol>
- * <li>send 2 messages to queue</li>
- * <li>remove all messages using management method</li>
- * <li>check there is no message to consume from queue</li>
- * <li>consume the message from otherQueue</li>
- * </ol>
- */
- public void testRemoveAllMessages() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send 2 messages on queue
- producer.send(session.createClientMessage(false));
- producer.send(session.createClientMessage(false));
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
-
- // delete all messages
- int deletedMessagesCount = queueControl.removeAllMessages();
- assertEquals(2, deletedMessagesCount);
- assertEquals(0, queueControl.getMessageCount());
-
- // check there is no message to consume from queue
- consumeMessages(0, session, queue);
-
- session.deleteQueue(queue);
- }
-
- /**
- * <ol>
- * <li>send 2 message to queue</li>
- * <li>remove messages from queue using management method <em>with filter</em></li>
- * <li>check there is only one message to consume from queue</li>
- * </ol>
- */
-
- public void testRemoveMatchingMessages() throws Exception
- {
- SimpleString key = new SimpleString("key");
- long matchingValue = randomLong();
- long unmatchingValue = matchingValue + 1;
-
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- ClientMessage matchingMessage = session.createClientMessage(false);
- matchingMessage.putLongProperty(key, matchingValue);
- producer.send(matchingMessage);
- ClientMessage unmatchingMessage = session.createClientMessage(false);
- unmatchingMessage.putLongProperty(key, unmatchingValue);
- producer.send(unmatchingMessage);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
-
- // removed matching messages to otherQueue
- int removedMatchedMessagesCount = queueControl.removeMatchingMessages(key + " =" + matchingValue);
- assertEquals(1, removedMatchedMessagesCount);
- assertEquals(1, queueControl.getMessageCount());
-
- // consume the unmatched message from queue
- ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage m = consumer.receive(500);
- assertNotNull(m);
- assertEquals(unmatchingValue, m.getProperty(key));
-
- m.acknowledge();
-
- // check there is no other message to consume:
- m = consumer.receive(500);
- assertNull(m);
-
-
- consumer.close();
- session.deleteQueue(queue);
- }
-
- public void testRemoveMessage() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send 2 messages on queue
- producer.send(session.createClientMessage(false));
- producer.send(session.createClientMessage(false));
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
-
- // the message IDs are set on the server
- MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
- assertEquals(2, messageInfos.length);
- long messageID = messageInfos[0].getID();
-
- // delete 1st message
- boolean deleted = queueControl.removeMessage(messageID);
- assertTrue(deleted);
- assertEquals(1, queueControl.getMessageCount());
-
- // check there is a single message to consume from queue
- consumeMessages(1, session, queue);
-
- session.deleteQueue(queue);
- }
-
- public void testCountMessagesWithFilter() throws Exception
- {
- SimpleString key = new SimpleString("key");
- long matchingValue = randomLong();
- long unmatchingValue = matchingValue + 1;
-
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- ClientMessage matchingMessage = session.createClientMessage(false);
- matchingMessage.putLongProperty(key, matchingValue);
- ClientMessage unmatchingMessage = session.createClientMessage(false);
- unmatchingMessage.putLongProperty(key, unmatchingValue);
- producer.send(matchingMessage);
- producer.send(unmatchingMessage);
- producer.send(matchingMessage);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(3, queueControl.getMessageCount());
-
- assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
- assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
-
- session.deleteQueue(queue);
- }
-
- public void testExpireMessagesWithFilter() throws Exception
- {
- SimpleString key = new SimpleString("key");
- long matchingValue = randomLong();
- long unmatchingValue = matchingValue + 1;
-
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- ClientMessage matchingMessage = session.createClientMessage(false);
- matchingMessage.putLongProperty(key, matchingValue);
- producer.send(matchingMessage);
- ClientMessage unmatchingMessage = session.createClientMessage(false);
- unmatchingMessage.putLongProperty(key, unmatchingValue);
- producer.send(unmatchingMessage);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
-
- int expiredMessagesCount = queueControl.expireMessages(key + " =" + matchingValue);
- assertEquals(1, expiredMessagesCount);
- assertEquals(1, queueControl.getMessageCount());
-
- // consume the unmatched message from queue
- ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage m = consumer.receive(500);
- assertNotNull(m);
- assertEquals(unmatchingValue, m.getProperty(key));
-
- m.acknowledge();
-
- // check there is no other message to consume:
- m = consumer.receive(500);
- assertNull(m);
-
- consumer.close();
- session.deleteQueue(queue);
- session.close();
- }
-
- public void testExpireMessage() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString expiryAddress = randomSimpleString();
- SimpleString expiryQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- session.createQueue(expiryAddress, expiryQueue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send on queue
- producer.send(session.createClientMessage(false));
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- QueueControlMBean expiryQueueControl = createQueueControl(expiryAddress, expiryQueue, mbeanServer);
- assertEquals(1, queueControl.getMessageCount());
- assertEquals(0, expiryQueueControl.getMessageCount());
-
- // the message IDs are set on the server
- MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
- assertEquals(1, messageInfos.length);
- long messageID = messageInfos[0].getID();
-
- queueControl.setExpiryAddress(expiryAddress.toString());
- boolean expired = queueControl.expireMessage(messageID);
- assertTrue(expired);
- assertEquals(0, queueControl.getMessageCount());
- assertEquals(1, expiryQueueControl.getMessageCount());
-
- consumeMessages(0, session, queue);
- consumeMessages(1, session, expiryQueue);
-
- session.deleteQueue(queue);
- session.deleteQueue(expiryQueue);
- session.close();
- }
-
- public void testSendMessageToDeadLetterAddress() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
- SimpleString deadLetterAddress = randomSimpleString();
- SimpleString deadLetterQueue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- session.createQueue(deadLetterAddress, deadLetterQueue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- // send 2 messages on queue
- producer.send(session.createClientMessage(false));
- producer.send(session.createClientMessage(false));
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- QueueControlMBean deadLetterQueueControl = createQueueControl(deadLetterAddress, deadLetterQueue, mbeanServer);
- assertEquals(2, queueControl.getMessageCount());
-
- // the message IDs are set on the server
- MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
- assertEquals(2, messageInfos.length);
- long messageID = messageInfos[0].getID();
-
- queueControl.setDeadLetterAddress(deadLetterAddress.toString());
-
- assertEquals(0, deadLetterQueueControl.getMessageCount());
- boolean movedToDeadLetterAddress = queueControl.sendMessageToDeadLetterAddress(messageID);
- assertTrue(movedToDeadLetterAddress);
- assertEquals(1, queueControl.getMessageCount());
- assertEquals(1, deadLetterQueueControl.getMessageCount());
-
- // check there is a single message to consume from queue
- consumeMessages(1, session, queue);
-
- // check there is a single message to consume from deadletter queue
- consumeMessages(1, session, deadLetterQueue);
-
- session.deleteQueue(queue);
- session.deleteQueue(deadLetterQueue);
- }
-
- public void testChangeMessagePriority() throws Exception
- {
- byte originalPriority = (byte)1;
- byte newPriority = (byte)8;
-
- SimpleString address = randomSimpleString();
- SimpleString queue = randomSimpleString();
-
- session.createQueue(address, queue, null, false, true);
- ClientProducer producer = session.createProducer(address);
- session.start();
-
- ClientMessage message = session.createClientMessage(false);
- message.setPriority(originalPriority);
- producer.send(message);
-
- QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
- assertEquals(1, queueControl.getMessageCount());
-
- // the message IDs are set on the server
- MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
- assertEquals(1, messageInfos.length);
- long messageID = messageInfos[0].getID();
-
- boolean priorityChanged = queueControl.changeMessagePriority(messageID, newPriority);
- assertTrue(priorityChanged);
-
- ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage m = consumer.receive(500);
- assertNotNull(m);
- assertEquals(newPriority, m.getPriority());
-
- consumer.close();
- session.deleteQueue(queue);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- mbeanServer = MBeanServerFactory.createMBeanServer();
-
- Configuration conf = new ConfigurationImpl();
- conf.setSecurityEnabled(false);
- conf.setJMXManagementEnabled(true);
- conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- service = Messaging.newNullStorageMessagingService(conf, mbeanServer);
- service.start();
-
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnNonPersistentSend(true);
- session = sf.createSession(false, true, true);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- session.close();
-
- service.stop();
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- private void consumeMessages(int expected, ClientSession session, SimpleString queue) throws Exception
- {
- ClientConsumer consumer = null;
- try
- {
- consumer = session.createConsumer(queue);
- ClientMessage m = null;
- for (int i = 0; i < expected; i++)
- {
- m = consumer.receive(500);
- assertNotNull("expected to received " + expected + " messages, got only " + (i + 1), m);
- m.acknowledge();
- }
- m = consumer.receive(500);
- assertNull("received one more message than expected (" + expected + ")", m);
- } finally {
- if (consumer != null)
- {
- consumer.close();
- }
- }
- }
-
- // Inner classes -------------------------------------------------
-
-}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java 2009-03-16 14:24:04 UTC (rev 6093)
@@ -0,0 +1,1137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.management;
+
+import static org.jboss.messaging.tests.integration.management.ManagementControlHelper.createMessagingServerControl;
+import static org.jboss.messaging.tests.integration.management.ManagementControlHelper.createQueueControl;
+import static org.jboss.messaging.tests.util.RandomUtil.randomBoolean;
+import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.management.MessageCounterInfo;
+import org.jboss.messaging.core.management.MessageInfo;
+import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A QueueControlTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class QueueControlTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service;
+
+ protected MBeanServer mbeanServer;
+
+ protected ClientSession session;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAttributes() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString filter = new SimpleString("color = 'blue'");
+ boolean durable = randomBoolean();
+ boolean temporary = false;
+
+ session.createQueue(address, queue, filter, durable, temporary);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(queue.toString(), queueControl.getName());
+ assertEquals(address.toString(), queueControl.getAddress());
+ assertEquals(filter.toString(), queueControl.getFilter());
+ assertEquals(durable, queueControl.isDurable());
+ assertEquals(temporary, queueControl.isTemporary());
+ assertEquals(false, queueControl.isBackup());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetNullFilter() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(queue.toString(), queueControl.getName());
+ assertEquals(null, queueControl.getFilter());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetDeadLetterAddress() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ final SimpleString deadLetterAddress = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertNull(queueControl.getDeadLetterAddress());
+
+ service.getServer().getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings()
+ {
+ @Override
+ public SimpleString getDeadLetterAddress()
+ {
+ return deadLetterAddress;
+ }
+ });
+
+ assertEquals(deadLetterAddress.toString(), queueControl.getDeadLetterAddress());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testSetDeadLetterAddress() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ String deadLetterAddress = randomString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ queueControl.setDeadLetterAddress(deadLetterAddress);
+
+ assertEquals(deadLetterAddress, queueControl.getDeadLetterAddress());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetExpiryAddress() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ final SimpleString expiryAddress = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertNull(queueControl.getExpiryAddress());
+
+ service.getServer().getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings()
+ {
+ @Override
+ public SimpleString getExpiryAddress()
+ {
+ return expiryAddress;
+ }
+ });
+
+ assertEquals(expiryAddress.toString(), queueControl.getExpiryAddress());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testSetExpiryAddress() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ String expiryAddress = randomString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ queueControl.setExpiryAddress(expiryAddress);
+
+ assertEquals(expiryAddress, queueControl.getExpiryAddress());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetConsumerCount() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ assertEquals(0, queueControl.getConsumerCount());
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ assertEquals(1, queueControl.getConsumerCount());
+
+ consumer.close();
+ assertEquals(0, queueControl.getConsumerCount());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetMessageCount() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(0, queueControl.getMessageCount());
+
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createClientMessage(false));
+ assertEquals(1, queueControl.getMessageCount());
+
+ consumeMessages(1, session, queue);
+
+ assertEquals(0, queueControl.getMessageCount());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetMessagesAdded() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(0, queueControl.getMessagesAdded());
+
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createClientMessage(false));
+ assertEquals(1, queueControl.getMessagesAdded());
+ producer.send(session.createClientMessage(false));
+ assertEquals(2, queueControl.getMessagesAdded());
+
+ consumeMessages(2, session, queue);
+
+ assertEquals(2, queueControl.getMessagesAdded());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetScheduledCount() throws Exception
+ {
+ long delay = 2000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(0, queueControl.getScheduledCount());
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createClientMessage(false);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
+ producer.send(message);
+
+ assertEquals(1, queueControl.getScheduledCount());
+ consumeMessages(0, session, queue);
+
+ Thread.sleep(delay);
+
+ assertEquals(0, queueControl.getScheduledCount());
+ consumeMessages(1, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
+ public void testListScheduledCount() throws Exception
+ {
+ long delay = 2000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ int intValue = randomInt();
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createClientMessage(false);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
+ message.putIntProperty(new SimpleString("key"), intValue);
+ producer.send(message);
+ // unscheduled message
+ producer.send(session.createClientMessage(false));
+
+ TabularData tabularData = queueControl.listScheduledMessages();
+ assertEquals(1, tabularData.size());
+ MessageInfo[] messageInfos = MessageInfo.from(tabularData);
+ assertEquals(Integer.toString(intValue), messageInfos[0].getProperties().get("key"));
+
+ Thread.sleep(delay);
+
+ tabularData = queueControl.listScheduledMessages();
+ assertEquals(0, tabularData.size());
+
+ consumeMessages(2, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
+ public void testGetDeliveringCount() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(0, queueControl.getDeliveringCount());
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message = consumer.receive(500);
+ assertNotNull(message);
+ assertEquals(1, queueControl.getDeliveringCount());
+
+ message.acknowledge();
+ session.commit();
+ assertEquals(0, queueControl.getDeliveringCount());
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ public void testListMessages() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ int intValue = randomInt();
+ session.createQueue(address, queue, null, false, false);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createClientMessage(false);
+ message.putIntProperty(new SimpleString("key"), intValue);
+ producer.send(message);
+
+ TabularData tabularData = queueControl.listAllMessages();
+ assertEquals(1, tabularData.size());
+ MessageInfo[] messageInfos = MessageInfo.from(tabularData);
+ assertEquals(Integer.toString(intValue), messageInfos[0].getProperties().get("key"));
+
+ consumeMessages(1, session, queue);
+
+ tabularData = queueControl.listAllMessages();
+ assertEquals(0, tabularData.size());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testListMessagesWithFilter() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+ String filter = key + " =" + matchingValue;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ producer.send(matchingMessage);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(unmatchingMessage);
+
+ TabularData tabularData = queueControl.listMessages(filter);
+ assertEquals(1, tabularData.size());
+ MessageInfo[] messageInfos = MessageInfo.from(tabularData);
+ assertEquals(Long.toString(matchingValue), messageInfos[0].getProperties().get("key"));
+
+ consumeMessages(2, session, queue);
+
+ tabularData = queueControl.listMessages(filter);
+ assertEquals(0, tabularData.size());
+
+ session.deleteQueue(queue);
+ }
+
+ /**
+ * <ol>
+ * <li>send a message to queue</li>
+ * <li>move all messages from queue to otherQueue using management method</li>
+ * <li>check there is no message to consume from queue</li>
+ * <li>consume the message from otherQueue</li>
+ * </ol>
+ */
+ public void testMoveAllMessages() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
+ SimpleString otherQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ session.createQueue(otherAddress, otherQueue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ ClientMessage message = session.createClientMessage(false);
+ SimpleString key = randomSimpleString();
+ long value = randomLong();
+ message.putLongProperty(key, value);
+ producer.send(message);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // moved all messages to otherQueue
+ int movedMessagesCount = queueControl.moveAllMessages(otherQueue.toString());
+ assertEquals(1, movedMessagesCount);
+ assertEquals(0, queueControl.getMessageCount());
+
+ // check there is no message to consume from queue
+ consumeMessages(0, session, queue);
+
+ // consume the message from otherQueue
+ ClientConsumer otherConsumer = session.createConsumer(otherQueue);
+ ClientMessage m = otherConsumer.receive(500);
+ assertEquals(value, m.getProperty(key));
+
+ m.acknowledge();
+
+ session.deleteQueue(queue);
+ otherConsumer.close();
+ session.deleteQueue(otherQueue);
+ }
+
+ public void testMoveAllMessagesToUnknownQueue() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString unknownQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ ClientMessage message = session.createClientMessage(false);
+ SimpleString key = randomSimpleString();
+ long value = randomLong();
+ message.putLongProperty(key, value);
+ producer.send(message);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // moved all messages to unknown queue
+ try
+ {
+ queueControl.moveAllMessages(unknownQueue.toString());
+ fail("operation must fail if the other queue does not exist");
+ }
+ catch (Exception e)
+ {
+ }
+ assertEquals(1, queueControl.getMessageCount());
+
+ consumeMessages(1, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
+ /**
+ * <ol>
+ * <li>send 2 message to queue</li>
+ * <li>move messages from queue to otherQueue using management method <em>with filter</em></li>
+ * <li>consume the message which <strong>did not</strong> matches the filter from queue</li>
+ * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li>
+ * </ol>
+ */
+
+ public void testMoveMatchingMessages() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
+ SimpleString otherQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ session.createQueue(otherAddress, otherQueue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ producer.send(matchingMessage);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(unmatchingMessage);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // moved matching messages to otherQueue
+ int movedMatchedMessagesCount = queueControl.moveMatchingMessages(key + " =" + matchingValue,
+ otherQueue.toString());
+ assertEquals(1, movedMatchedMessagesCount);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // consume the unmatched message from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(unmatchingValue, m.getProperty(key));
+
+ // consume the matched message from otherQueue
+ ClientConsumer otherConsumer = session.createConsumer(otherQueue);
+ m = otherConsumer.receive(500);
+ assertNotNull(m);
+ assertEquals(matchingValue, m.getProperty(key));
+
+ m.acknowledge();
+
+ consumer.close();
+ session.deleteQueue(queue);
+ otherConsumer.close();
+ session.deleteQueue(otherQueue);
+ }
+
+ public void testMoveMessage() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
+ SimpleString otherQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ session.createQueue(otherAddress, otherQueue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send 2 messages on queue
+ producer.send(session.createClientMessage(false));
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ QueueControlMBean otherQueueControl = createManagementControl(otherAddress, otherQueue);
+ assertEquals(2, queueControl.getMessageCount());
+ assertEquals(0, otherQueueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(2, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ boolean moved = queueControl.moveMessage(messageID, otherQueue.toString());
+ assertTrue(moved);
+ assertEquals(1, queueControl.getMessageCount());
+ assertEquals(1, otherQueueControl.getMessageCount());
+
+ consumeMessages(1, session, queue);
+ consumeMessages(1, session, otherQueue);
+
+ session.deleteQueue(queue);
+ session.deleteQueue(otherQueue);
+ }
+
+ public void testMoveMessageToUnknownQueue() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString unknownQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send 2 messages on queue
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(1, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ // moved all messages to unknown queue
+ try
+ {
+ queueControl.moveMessage(messageID, unknownQueue.toString());
+ fail("operation must fail if the other queue does not exist");
+ }
+ catch (Exception e)
+ {
+ }
+ assertEquals(1, queueControl.getMessageCount());
+
+ consumeMessages(1, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
+ /**
+ * <ol>
+ * <li>send 2 messages to queue</li>
+ * <li>remove all messages using management method</li>
+ * <li>check there is no message to consume from queue</li>
+ * <li>consume the message from otherQueue</li>
+ * </ol>
+ */
+ public void testRemoveAllMessages() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send 2 messages on queue
+ producer.send(session.createClientMessage(false));
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // delete all messages
+ int deletedMessagesCount = queueControl.removeAllMessages();
+ assertEquals(2, deletedMessagesCount);
+ assertEquals(0, queueControl.getMessageCount());
+
+ // check there is no message to consume from queue
+ consumeMessages(0, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
+ /**
+ * <ol>
+ * <li>send 2 message to queue</li>
+ * <li>remove messages from queue using management method <em>with filter</em></li>
+ * <li>check there is only one message to consume from queue</li>
+ * </ol>
+ */
+
+ public void testRemoveMatchingMessages() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ producer.send(matchingMessage);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(unmatchingMessage);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // removed matching messages to otherQueue
+ int removedMatchedMessagesCount = queueControl.removeMatchingMessages(key + " =" + matchingValue);
+ assertEquals(1, removedMatchedMessagesCount);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // consume the unmatched message from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(unmatchingValue, m.getProperty(key));
+
+ m.acknowledge();
+
+ // check there is no other message to consume:
+ m = consumer.receive(500);
+ assertNull(m);
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ public void testRemoveMessage() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send 2 messages on queue
+ producer.send(session.createClientMessage(false));
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(2, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ // delete 1st message
+ boolean deleted = queueControl.removeMessage(messageID);
+ assertTrue(deleted);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // check there is a single message to consume from queue
+ consumeMessages(1, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
+ public void testCountMessagesWithFilter() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(3, queueControl.getMessageCount());
+
+ assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
+ assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
+
+ session.deleteQueue(queue);
+ }
+
+ public void testExpireMessagesWithFilter() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ producer.send(matchingMessage);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(unmatchingMessage);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ int expiredMessagesCount = queueControl.expireMessages(key + " =" + matchingValue);
+ assertEquals(1, expiredMessagesCount);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // consume the unmatched message from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(unmatchingValue, m.getProperty(key));
+
+ m.acknowledge();
+
+ // check there is no other message to consume:
+ m = consumer.receive(500);
+ assertNull(m);
+
+ consumer.close();
+ session.deleteQueue(queue);
+ session.close();
+ }
+
+ public void testExpireMessage() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString expiryAddress = randomSimpleString();
+ SimpleString expiryQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ session.createQueue(expiryAddress, expiryQueue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send on queue
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ QueueControlMBean expiryQueueControl = createManagementControl(expiryAddress, expiryQueue);
+ assertEquals(1, queueControl.getMessageCount());
+ assertEquals(0, expiryQueueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(1, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ queueControl.setExpiryAddress(expiryAddress.toString());
+ boolean expired = queueControl.expireMessage(messageID);
+ assertTrue(expired);
+ assertEquals(0, queueControl.getMessageCount());
+ assertEquals(1, expiryQueueControl.getMessageCount());
+
+ consumeMessages(0, session, queue);
+ consumeMessages(1, session, expiryQueue);
+
+ session.deleteQueue(queue);
+ session.deleteQueue(expiryQueue);
+ session.close();
+ }
+
+ public void testSendMessageToDeadLetterAddress() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString deadLetterAddress = randomSimpleString();
+ SimpleString deadLetterQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ session.createQueue(deadLetterAddress, deadLetterQueue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ // send 2 messages on queue
+ producer.send(session.createClientMessage(false));
+ producer.send(session.createClientMessage(false));
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ QueueControlMBean deadLetterQueueControl = createManagementControl(deadLetterAddress, deadLetterQueue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(2, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ queueControl.setDeadLetterAddress(deadLetterAddress.toString());
+
+ assertEquals(0, deadLetterQueueControl.getMessageCount());
+ boolean movedToDeadLetterAddress = queueControl.sendMessageToDeadLetterAddress(messageID);
+ assertTrue(movedToDeadLetterAddress);
+ assertEquals(1, queueControl.getMessageCount());
+ assertEquals(1, deadLetterQueueControl.getMessageCount());
+
+ // check there is a single message to consume from queue
+ consumeMessages(1, session, queue);
+
+ // check there is a single message to consume from deadletter queue
+ consumeMessages(1, session, deadLetterQueue);
+
+ session.deleteQueue(queue);
+ session.deleteQueue(deadLetterQueue);
+ }
+
+ public void testChangeMessagePriority() throws Exception
+ {
+ byte originalPriority = (byte)1;
+ byte newPriority = (byte)8;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage message = session.createClientMessage(false);
+ message.setPriority(originalPriority);
+ producer.send(message);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(1, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ boolean priorityChanged = queueControl.changeMessagePriority(messageID, newPriority);
+ assertTrue(priorityChanged);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(newPriority, m.getPriority());
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ public void testChangeMessagePriorityWithInvalidValue() throws Exception
+ {
+ byte invalidPriority = (byte)23;
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage message = session.createClientMessage(false);
+ producer.send(message);
+
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ MessageInfo[] messageInfos = MessageInfo.from(queueControl.listAllMessages());
+ assertEquals(1, messageInfos.length);
+ long messageID = messageInfos[0].getID();
+
+ try
+ {
+ queueControl.changeMessagePriority(messageID, invalidPriority);
+ fail("operation fails when priority value is < 0 or > 9");
+ }
+ catch (Exception e)
+ {
+ }
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertTrue(invalidPriority != m.getPriority());
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ public void testListMessageCounter() throws Exception
+ {
+ long counterPeriod = 1000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ MessagingServerControlMBean serverControl = createMessagingServerControl(mbeanServer);
+ serverControl.enableMessageCounters();
+ serverControl.setMessageCounterSamplePeriod(counterPeriod);
+
+ CompositeData compositeData = queueControl.listMessageCounter();
+ MessageCounterInfo info = MessageCounterInfo.from(compositeData);
+ assertEquals(0, info.getDepth());
+ assertEquals(0, info.getCount());
+
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createClientMessage(false));
+
+ Thread.sleep(counterPeriod * 2);
+ compositeData = queueControl.listMessageCounter();
+ info = MessageCounterInfo.from(compositeData);
+ assertEquals(1, info.getDepth());
+ assertEquals(1, info.getCount());
+
+ consumeMessages(1, session, queue);
+
+ Thread.sleep(counterPeriod * 2);
+ compositeData = queueControl.listMessageCounter();
+ info = MessageCounterInfo.from(compositeData);
+ assertEquals(0, info.getDepth());
+ assertEquals(1, info.getCount());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testListMessageCounterAsHTML() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ String history = queueControl.listMessageCounterAsHTML();
+ assertNotNull(history);
+
+ session.deleteQueue(queue);
+ }
+
+ public void testListMessageCounterHistory() throws Exception
+ {
+ long counterPeriod = 1000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ MessagingServerControlMBean serverControl = createMessagingServerControl(mbeanServer);
+ serverControl.enableMessageCounters();
+ serverControl.setMessageCounterSamplePeriod(counterPeriod);
+
+ TabularData tabularData = queueControl.listMessageCounterHistory();
+ assertEquals(1, tabularData.size());
+
+ session.deleteQueue(queue);
+ }
+
+ public void testListMessageCounterHistoryAsHTML() throws Exception
+ {
+ long counterPeriod = 1000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, false);
+ QueueControlMBean queueControl = createManagementControl(address, queue);
+
+ MessagingServerControlMBean serverControl = createMessagingServerControl(mbeanServer);
+ serverControl.enableMessageCounters();
+ serverControl.setMessageCounterSamplePeriod(counterPeriod);
+
+ String history = queueControl.listMessageCounterHistoryAsHTML();
+ assertNotNull(history);
+
+ session.deleteQueue(queue);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mbeanServer = MBeanServerFactory.createMBeanServer();
+
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ service = Messaging.newNullStorageMessagingService(conf, mbeanServer);
+ service.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnNonPersistentSend(true);
+ session = sf.createSession(false, true, false);
+ session.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ session.close();
+
+ service.stop();
+
+ super.tearDown();
+ }
+
+ protected QueueControlMBean createManagementControl(SimpleString address, SimpleString queue) throws Exception
+ {
+ QueueControlMBean queueControl = createQueueControl(address, queue, mbeanServer);
+ return queueControl;
+ }
+
+ // Private -------------------------------------------------------
+
+ private void consumeMessages(int expected, ClientSession session, SimpleString queue) throws Exception
+ {
+ ClientConsumer consumer = null;
+ try
+ {
+ consumer = session.createConsumer(queue);
+ ClientMessage m = null;
+ for (int i = 0; i < expected; i++)
+ {
+ m = consumer.receive(500);
+ assertNotNull("expected to received " + expected + " messages, got only " + (i + 1), m);
+ m.acknowledge();
+ }
+ session.commit();
+ m = consumer.receive(500);
+ assertNull("received one more message than expected (" + expected + ")", m);
+ }
+ finally
+ {
+ if (consumer != null)
+ {
+ consumer.close();
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/management/core/CoreQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/core/CoreQueueControlTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/core/CoreQueueControlTest.java 2009-03-16 14:24:04 UTC (rev 6093)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.management.core;
+
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientRequestor;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.management.ObjectNames;
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.tests.integration.management.QueueControlTest;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A JMXQueueControlTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class CoreQueueControlTest extends QueueControlTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ private static String fromNullableSimpleString(SimpleString sstring)
+ {
+ if (sstring == null)
+ {
+ return null;
+ }
+ else
+ {
+ return sstring.toString();
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
+ // QueueControlTestBase overrides --------------------------------
+
+ @Override
+ protected QueueControlMBean createManagementControl(final SimpleString address, final SimpleString queue) throws Exception
+ {
+
+ final ClientRequestor requestor = new ClientRequestor(session, DEFAULT_MANAGEMENT_ADDRESS);
+
+ return new QueueControlMBean()
+ {
+
+ private ObjectName queueON = ObjectNames.getQueueObjectName(address, queue);
+
+ public boolean changeMessagePriority(long messageID, int newPriority) throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "changeMessagePriority", messageID, newPriority);
+ ClientMessage reply = requestor.request(m);
+ return (Boolean)reply.getProperty(new SimpleString("changeMessagePriority"));
+ }
+
+ public int countMessages(String filter) throws Exception
+ {
+ return (Integer)invokOperation("countMessages", filter);
+ }
+
+ public boolean expireMessage(long messageID) throws Exception
+ {
+ return (Boolean)invokOperation("expireMessage", messageID);
+ }
+
+ public int expireMessages(String filter) throws Exception
+ {
+ return (Integer)invokOperation("expireMessages", filter);
+ }
+
+ public String getAddress()
+ {
+ return fromNullableSimpleString((SimpleString)retriveAttributeValue("Address"));
+ }
+
+ public int getConsumerCount()
+ {
+ return (Integer)retriveAttributeValue("ConsumerCount");
+ }
+
+ public String getDeadLetterAddress()
+ {
+ return fromNullableSimpleString((SimpleString)retriveAttributeValue("DeadLetterAddress"));
+ }
+
+ public int getDeliveringCount()
+ {
+ return (Integer)retriveAttributeValue("DeliveringCount");
+ }
+
+ public String getExpiryAddress()
+ {
+ return fromNullableSimpleString((SimpleString)retriveAttributeValue("ExpiryAddress"));
+ }
+
+ public String getFilter()
+ {
+ return fromNullableSimpleString((SimpleString)retriveAttributeValue("Filter"));
+ }
+
+ public int getMessageCount()
+ {
+ return (Integer)retriveAttributeValue("MessageCount");
+ }
+
+ public int getMessagesAdded()
+ {
+ return (Integer)retriveAttributeValue("MessagesAdded");
+ }
+
+ public String getName()
+ {
+ return fromNullableSimpleString((SimpleString)retriveAttributeValue("Name"));
+ }
+
+ public long getPersistenceID()
+ {
+ return (Long)retriveAttributeValue("PersistenceID");
+ }
+
+ public long getScheduledCount()
+ {
+ return (Long)retriveAttributeValue("ScheduledCount");
+ }
+
+ public boolean isBackup()
+ {
+ return (Boolean)retriveAttributeValue("Backup");
+ }
+
+ public boolean isDurable()
+ {
+ return (Boolean)retriveAttributeValue("Durable");
+ }
+
+ public boolean isTemporary()
+ {
+ return (Boolean)retriveAttributeValue("Temporary");
+ }
+
+ public TabularData listAllMessages() throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "listAllMessages");
+ ClientMessage reply = requestor.request(m);
+ return (TabularData)ManagementHelper.getTabularDataProperty(reply, "listAllMessages");
+ }
+
+ public CompositeData listMessageCounter() throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "listMessageCounter");
+ ClientMessage reply = requestor.request(m);
+ return (CompositeData)ManagementHelper.getCompositeDataProperty(reply, "listMessageCounter");
+ }
+
+ public String listMessageCounterAsHTML() throws Exception
+ {
+ return fromNullableSimpleString((SimpleString)invokOperation("listMessageCounterAsHTML"));
+ }
+
+ public TabularData listMessageCounterHistory() throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "listMessageCounterHistory");
+ ClientMessage reply = requestor.request(m);
+ return (TabularData)ManagementHelper.getTabularDataProperty(reply, "listMessageCounterHistory");
+ }
+
+ public String listMessageCounterHistoryAsHTML() throws Exception
+ {
+ return fromNullableSimpleString((SimpleString)invokOperation("listMessageCounterHistoryAsHTML"));
+ }
+
+ public TabularData listMessages(String filter) throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "listMessages", filter);
+ ClientMessage reply = requestor.request(m);
+ return (TabularData)ManagementHelper.getTabularDataProperty(reply, "listMessages");
+ }
+
+ public TabularData listScheduledMessages() throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "listScheduledMessages");
+ ClientMessage reply = requestor.request(m);
+ return (TabularData)ManagementHelper.getTabularDataProperty(reply, "listScheduledMessages");
+ }
+
+ public int moveAllMessages(String otherQueueName) throws Exception
+ {
+ return (Integer)invokOperation("moveAllMessages", otherQueueName);
+ }
+
+ public int moveMatchingMessages(String filter, String otherQueueName) throws Exception
+ {
+ return (Integer)invokOperation("moveMatchingMessages", filter, otherQueueName);
+ }
+
+ public boolean moveMessage(long messageID, String otherQueueName) throws Exception
+ {
+ return (Boolean)invokOperation("moveMessage", messageID, otherQueueName);
+ }
+
+ public int removeAllMessages() throws Exception
+ {
+ return (Integer)invokOperation("removeAllMessages");
+ }
+
+ public int removeMatchingMessages(String filter) throws Exception
+ {
+ return (Integer)invokOperation("removeMatchingMessages", filter);
+ }
+
+ public boolean removeMessage(long messageID) throws Exception
+ {
+ return (Boolean)invokOperation("removeMessage", messageID);
+ }
+
+ public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception
+ {
+ return (Boolean)invokOperation("sendMessageToDeadLetterAddress", messageID);
+ }
+
+ public void setDeadLetterAddress(String deadLetterAddress) throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "setDeadLetterAddress", deadLetterAddress);
+ requestor.request(m);
+ }
+
+ public void setExpiryAddress(String expiryAddres) throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, "setExpiryAddress", expiryAddres);
+ requestor.request(m);
+ }
+
+ private Object retriveAttributeValue(String attributeName)
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putAttributes(m, queueON, attributeName);
+ ClientMessage reply;
+ try
+ {
+ reply = requestor.request(m);
+ Object attributeValue = reply.getProperty(new SimpleString(attributeName));
+ if (attributeValue.equals(new SimpleString("null")))
+ {
+ return null;
+ }
+ return attributeValue;
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Object invokOperation(String operationName, Object... args) throws Exception
+ {
+ ClientMessage m = session.createClientMessage(false);
+ ManagementHelper.putOperationInvocation(m, queueON, operationName, args);
+ ClientMessage reply = requestor.request(m);
+ return reply.getProperty(new SimpleString(operationName));
+ }
+ };
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list