[jboss-cvs] JBoss Messaging SVN: r5500 - in trunk: src/main/org/jboss/messaging/core/management/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 10 08:18:52 EST 2008
Author: jmesnil
Date: 2008-12-10 08:18:52 -0500 (Wed, 10 Dec 2008)
New Revision: 5500
Modified:
trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
Log:
JBMESSAGING-1045: Count Message With Selector
- added method countMessages(filter) in QueueControlMBean & JMSQeueControlMBean which returns the number of messages matching the filter in the queue
- added method countMessagesForSubscription(clientID, subName, filter) in TopicControlMBean which returns the number of messages matching the filter for the given durable subscriber
Modified: trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -89,6 +89,11 @@
@Parameter(name = "filter", desc = "A message filter") String filter)
throws Exception;
+ @Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = INFO)
+ int countMessages(
+ @Parameter(name = "filter", desc = "A message filter") String filter)
+ throws Exception;
+
@Operation(desc = "Remove all the messages from the queue", impact = ACTION)
int removeAllMessages() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -258,6 +258,13 @@
}
}
+
+ public int countMessages(final String filterStr) throws Exception {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ List<MessageReference> refs = queue.list(filter);
+ return refs.size();
+ }
+
public int removeAllMessages() throws Exception
{
try
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -173,6 +173,11 @@
{
return localQueueControl.listMessages(filter);
}
+
+ public int countMessages(final String filter) throws Exception
+ {
+ return localQueueControl.countMessages(filter);
+ }
public TabularData listScheduledMessages() throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -79,6 +79,11 @@
@Parameter(name = "filter", desc = "A JMS Message filter") String filter)
throws Exception;
+ @Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = INFO)
+ int countMessages(
+ @Parameter(name = "filter", desc = "A JMS message filter") String filter)
+ throws Exception;
+
@Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
boolean removeMessage(
@Parameter(name = "messageID", desc = "A message ID") String messageID)
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -67,6 +67,13 @@
@Parameter(name = "queueName", desc = "the name of the queue representing a subscription") String queueName)
throws Exception;
+ @Operation(desc = "Count the number of messages matching the filter for the given subscription")
+ public int countMessagesForSubscription(
+ @Parameter(name = "clientID", desc = "the client ID") String clientID,
+ @Parameter(name = "subscriptionName", desc = "the name of the durable subscription") String subscriptionName,
+ @Parameter(name = "filter", desc = "a JMS filter") String filter)
+ throws Exception;
+
// Specific API
@Operation(desc = "List all subscriptions")
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -47,7 +47,6 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.client.JBossMessage;
import org.jboss.messaging.jms.client.SelectorTranslator;
@@ -281,6 +280,13 @@
}
}
+ public int countMessages(final String filterStr) throws Exception
+ {
+ Filter filter = createFilterFromJMSSelector(filterStr);
+ List<MessageReference> messageRefs = coreQueue.list(filter);
+ return messageRefs.size();
+ }
+
public boolean expireMessage(final String messageID) throws Exception
{
Filter filter = createFilterForJMSMessageID(messageID);
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -28,6 +28,8 @@
import javax.management.openmbean.TabularData;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -181,6 +183,21 @@
}
return JMSMessageInfo.toTabularData(infos);
}
+
+ public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+ {
+ String queueName = JBossTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ SimpleString sAddress = new SimpleString(queueName);
+ Binding binding = postOffice.getBinding(sAddress);
+ if (binding == null)
+ {
+ throw new IllegalArgumentException("No queue with name " + sAddress);
+ }
+ Queue queue = binding.getQueue();
+ Filter filter = FilterImpl.createFilter(filterStr);
+ List<MessageReference> messageRefs = queue.list(filter);
+ return messageRefs.size();
+ }
public int removeAllMessages() throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -153,6 +153,11 @@
{
return localControl.listMessages(filter);
}
+
+ public int countMessages(final String filter) throws Exception
+ {
+ return localControl.countMessages(filter);
+ }
public String getAddress()
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -118,6 +118,11 @@
{
return localControl.listMessagesForSubscription(queueName);
}
+
+ public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+ {
+ return localControl.countMessagesForSubscription(clientID, subscriptionName, filterStr);
+ }
public SubscriptionInfo[] listNonDurableSubscriptionInfos()
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -26,8 +26,6 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -92,19 +90,8 @@
return queueControl;
}
- private static Message sendMessageWithProperty(Session session, Destination destination, String key, long value) throws JMSException
- {
- MessageProducer producer = session.createProducer(destination);
- Message message = session.createMessage();
- message.setLongProperty(key, value);
- producer.send(message);
- return message;
- }
-
- // Constructors --------------------------------------------------
-
// Public --------------------------------------------------------
-
+
public void testChangeMessagePriority() throws Exception
{
byte oldPriority = (byte)1;
@@ -154,8 +141,8 @@
long unmatchingValue = matchingValue + 1;
// send 1 message
- sendMessageWithProperty(session, queue, key, unmatchingValue);
- sendMessageWithProperty(session, queue, key, matchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
// wiat a little bit to give time for the message to be handled by the server
Thread.sleep(timeToSleep);
@@ -202,8 +189,8 @@
long unmatchingValue = matchingValue + 1;
// send on queue
- sendMessageWithProperty(session, queue, key, unmatchingValue);
- sendMessageWithProperty(session, queue, key, matchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
// wait a little bit to ensure the message is handled by the server
Thread.sleep(timeToSleep);
@@ -277,8 +264,8 @@
long unmatchingValue = matchingValue + 1;
// send on queue
- sendMessageWithProperty(session, queue, key, unmatchingValue);
- sendMessageWithProperty(session, queue, key, matchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
// wait a little bit to ensure the message is handled by the server
Thread.sleep(timeToSleep );
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -41,6 +41,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import java.lang.management.ManagementFactory;
@@ -60,6 +61,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.jms.JBossQueue;
@@ -284,6 +286,32 @@
assertEquals(messageID, message.getJMSMessageID());
}
+ public void testCountMessagesWithFilter() throws Exception
+ {
+ String key = "key";
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ JMSQueueControlMBean queueControl = createQueueControl(queue);
+
+ Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+ JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
+
+ // wiat a little bit to give time for the message to be handled by the server
+ Thread.sleep(200);
+
+ assertEquals(3, queueControl.getMessageCount());
+
+ assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
+ assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
+
+ session.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -46,6 +46,7 @@
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -222,6 +223,15 @@
}
}
+ public static Message sendMessageWithProperty(Session session, Destination destination, String key, long value) throws JMSException
+ {
+ MessageProducer producer = session.createProducer(destination);
+ Message message = session.createMessage();
+ message.setLongProperty(key, value);
+ producer.send(message);
+ return message;
+ }
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -22,10 +22,13 @@
package org.jboss.messaging.tests.integration.jms.management;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import java.lang.management.ManagementFactory;
+import javax.jms.Connection;
+import javax.jms.Session;
import javax.jms.Topic;
import javax.management.MBeanServerInvocationHandler;
@@ -34,6 +37,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.jms.JBossTopic;
@@ -133,6 +137,33 @@
assertEquals(2, topicControl.listDurableSubscriptionInfos().length);
}
+ public void testCountMessagesForSubscription() throws Exception
+ {
+ String key = "key";
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ JMSUtil.createDurableSubscriber(topic, clientID, subscriptionName);
+
+ Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ connection.setClientID(clientID);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JMSUtil.sendMessageWithProperty(session, topic, key, matchingValue);
+ JMSUtil.sendMessageWithProperty(session, topic, key, unmatchingValue);
+ JMSUtil.sendMessageWithProperty(session, topic, key, matchingValue);
+
+ TopicControlMBean topicControl = createTopicControl(topic);
+
+ // wiat a little bit to give time for the message to be handled by the server
+ Thread.sleep(200);
+
+ assertEquals(3, topicControl.getMessageCount());
+
+ assertEquals(2, topicControl.countMessagesForSubscription(clientID, subscriptionName, key + " =" + matchingValue));
+ assertEquals(1, topicControl.countMessagesForSubscription(clientID, subscriptionName, key + " =" + unmatchingValue));
+ }
+
public void testDropDurableSubscriptionWithExistingSubscription() throws Exception
{
JMSUtil.createDurableSubscriber(topic, clientID, subscriptionName);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java 2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java 2008-12-10 13:18:52 UTC (rev 5500)
@@ -312,6 +312,43 @@
session.close();
}
+ public void testCountMessagesWithFilter() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSession session = sf.createSession(false, true, true);
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true, true);
+ ClientProducer producer = session.createProducer(address);
+ session.start();
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(100);
+ QueueControlMBean queueControl = createQueueControl(address, queue);
+ assertEquals(3, queueControl.getMessageCount());
+
+ assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
+ assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
+
+ session.deleteQueue(queue);
+ session.close();
+ }
+
public void testExpireMessagesWithFilter() throws Exception
{
SimpleString key = new SimpleString("key");
More information about the jboss-cvs-commits
mailing list