[jboss-cvs] JBoss Messaging SVN: r5500 - in trunk: src/main/org/jboss/messaging/core/management/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 10 08:18:52 EST 2008


Author: jmesnil
Date: 2008-12-10 08:18:52 -0500 (Wed, 10 Dec 2008)
New Revision: 5500

Modified:
   trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
   trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
Log:
JBMESSAGING-1045: Count Message With Selector

- added method countMessages(filter) in QueueControlMBean & JMSQeueControlMBean which returns the number of messages matching the filter in the queue
- added method countMessagesForSubscription(clientID, subName, filter) in TopicControlMBean which returns the number of messages matching the filter for the given durable subscriber


Modified: trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -89,6 +89,11 @@
          @Parameter(name = "filter", desc = "A message filter") String filter)
          throws Exception;
 
+   @Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = INFO)
+   int countMessages(
+         @Parameter(name = "filter", desc = "A message filter") String filter)
+         throws Exception;
+
    @Operation(desc = "Remove all the messages from the queue", impact = ACTION)
    int removeAllMessages() throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -258,6 +258,13 @@
       }
    }
 
+
+   public int countMessages(final String filterStr) throws Exception {
+      Filter filter = FilterImpl.createFilter(filterStr);
+      List<MessageReference> refs = queue.list(filter);
+      return refs.size();
+   }
+   
    public int removeAllMessages() throws Exception
    {
       try

Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -173,6 +173,11 @@
    {
       return localQueueControl.listMessages(filter);
    }
+   
+   public int countMessages(final String filter) throws Exception
+   {
+      return localQueueControl.countMessages(filter);
+   }
 
    public TabularData listScheduledMessages() throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -79,6 +79,11 @@
          @Parameter(name = "filter", desc = "A JMS Message filter") String filter)
          throws Exception;
 
+   @Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = INFO)
+   int countMessages(
+         @Parameter(name = "filter", desc = "A JMS message filter") String filter)
+         throws Exception;
+
    @Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
    boolean removeMessage(
          @Parameter(name = "messageID", desc = "A message ID") String messageID)

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/TopicControlMBean.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -67,6 +67,13 @@
          @Parameter(name = "queueName", desc = "the name of the queue representing a subscription") String queueName)
          throws Exception;
 
+   @Operation(desc = "Count the number of messages matching the filter for the given subscription")
+   public int countMessagesForSubscription(
+         @Parameter(name = "clientID", desc = "the client ID") String clientID,
+         @Parameter(name = "subscriptionName", desc = "the name of the durable subscription") String subscriptionName,
+         @Parameter(name = "filter", desc = "a JMS filter") String filter)
+         throws Exception;
+
    // Specific API
 
    @Operation(desc = "List all subscriptions")

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -47,7 +47,6 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.jms.JBossDestination;
 import org.jboss.messaging.jms.JBossQueue;
 import org.jboss.messaging.jms.client.JBossMessage;
 import org.jboss.messaging.jms.client.SelectorTranslator;
@@ -281,6 +280,13 @@
       }
    }
 
+   public int countMessages(final String filterStr) throws Exception
+   {
+      Filter filter = createFilterFromJMSSelector(filterStr);
+      List<MessageReference> messageRefs = coreQueue.list(filter);
+      return messageRefs.size();
+   }
+   
    public boolean expireMessage(final String messageID) throws Exception
    {
       Filter filter = createFilterForJMSMessageID(messageID);

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -28,6 +28,8 @@
 
 import javax.management.openmbean.TabularData;
 
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -181,6 +183,21 @@
       }
       return JMSMessageInfo.toTabularData(infos);
    }
+   
+   public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+   {
+      String queueName = JBossTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+      SimpleString sAddress = new SimpleString(queueName);
+      Binding binding = postOffice.getBinding(sAddress);
+      if (binding == null)
+      {
+         throw new IllegalArgumentException("No queue with name " + sAddress);
+      }
+      Queue queue = binding.getQueue();
+      Filter filter = FilterImpl.createFilter(filterStr);
+      List<MessageReference> messageRefs = queue.list(filter);
+      return messageRefs.size();
+   }
 
    public int removeAllMessages() throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -153,6 +153,11 @@
    {
       return localControl.listMessages(filter);
    }
+   
+   public int countMessages(final String filter) throws Exception
+   {
+      return localControl.countMessages(filter);
+   }
 
    public String getAddress()
    {

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareTopicControlWrapper.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -118,6 +118,11 @@
    {
       return localControl.listMessagesForSubscription(queueName);
    }
+   
+   public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+   {
+      return localControl.countMessagesForSubscription(clientID, subscriptionName, filterStr);
+   }
 
    public SubscriptionInfo[] listNonDurableSubscriptionInfos()
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -26,8 +26,6 @@
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
 import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -92,19 +90,8 @@
       return queueControl;
    }
    
-   private static Message sendMessageWithProperty(Session session, Destination destination, String key, long value) throws JMSException
-   {
-      MessageProducer producer = session.createProducer(destination);
-      Message message = session.createMessage();
-      message.setLongProperty(key, value);
-      producer.send(message);
-      return message;
-   }
-
-   // Constructors --------------------------------------------------
-
    // Public --------------------------------------------------------
-
+   
    public void testChangeMessagePriority() throws Exception
    {
       byte oldPriority = (byte)1;
@@ -154,8 +141,8 @@
       long unmatchingValue = matchingValue + 1;
 
       // send 1 message
-      sendMessageWithProperty(session, queue, key, unmatchingValue);
-      sendMessageWithProperty(session, queue, key, matchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
 
       // wiat a little bit to give time for the message to be handled by the server
       Thread.sleep(timeToSleep);
@@ -202,8 +189,8 @@
       long unmatchingValue = matchingValue + 1;
 
       // send on queue
-      sendMessageWithProperty(session, queue, key, unmatchingValue);
-      sendMessageWithProperty(session, queue, key, matchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
 
       // wait a little bit to ensure the message is handled by the server
       Thread.sleep(timeToSleep);
@@ -277,8 +264,8 @@
       long unmatchingValue = matchingValue + 1;
 
       // send on queue
-      sendMessageWithProperty(session, queue, key, unmatchingValue);
-      sendMessageWithProperty(session, queue, key, matchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
 
       // wait a little bit to ensure the message is handled by the server
       Thread.sleep(timeToSleep );

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -41,6 +41,7 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
 import java.lang.management.ManagementFactory;
@@ -60,6 +61,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 import org.jboss.messaging.jms.JBossQueue;
@@ -284,6 +286,32 @@
       assertEquals(messageID, message.getJMSMessageID());
    }
 
+   public void testCountMessagesWithFilter() throws Exception
+   {
+      String key = "key";
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      JMSQueueControlMBean queueControl = createQueueControl(queue);
+
+      Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, unmatchingValue);
+      JMSUtil.sendMessageWithProperty(session, queue, key, matchingValue);
+
+      // wiat a little bit to give time for the message to be handled by the server
+      Thread.sleep(200);
+
+      assertEquals(3, queueControl.getMessageCount());
+
+      assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
+      assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
+      
+      session.close();
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -46,6 +46,7 @@
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -222,6 +223,15 @@
       }
    }
 
+   public static Message sendMessageWithProperty(Session session, Destination destination, String key, long value) throws JMSException
+   {
+      MessageProducer producer = session.createProducer(destination);
+      Message message = session.createMessage();
+      message.setLongProperty(key, value);
+      producer.send(message);
+      return message;
+   }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/TopicControlTest.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -22,10 +22,13 @@
 
 package org.jboss.messaging.tests.integration.jms.management;
 
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
 import java.lang.management.ManagementFactory;
 
+import javax.jms.Connection;
+import javax.jms.Session;
 import javax.jms.Topic;
 import javax.management.MBeanServerInvocationHandler;
 
@@ -34,6 +37,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 import org.jboss.messaging.jms.JBossTopic;
@@ -133,6 +137,33 @@
       assertEquals(2, topicControl.listDurableSubscriptionInfos().length);
    }
    
+   public void testCountMessagesForSubscription() throws Exception
+   {
+      String key = "key";
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      JMSUtil.createDurableSubscriber(topic, clientID, subscriptionName);
+      
+      Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      connection.setClientID(clientID);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      JMSUtil.sendMessageWithProperty(session, topic, key, matchingValue);
+      JMSUtil.sendMessageWithProperty(session, topic, key, unmatchingValue);
+      JMSUtil.sendMessageWithProperty(session, topic, key, matchingValue);
+      
+      TopicControlMBean topicControl = createTopicControl(topic);
+      
+      // wiat a little bit to give time for the message to be handled by the server
+      Thread.sleep(200);
+
+      assertEquals(3, topicControl.getMessageCount());
+
+      assertEquals(2, topicControl.countMessagesForSubscription(clientID, subscriptionName, key + " =" + matchingValue));
+      assertEquals(1, topicControl.countMessagesForSubscription(clientID, subscriptionName, key + " =" + unmatchingValue));
+   }
+   
    public void testDropDurableSubscriptionWithExistingSubscription() throws Exception
    {
       JMSUtil.createDurableSubscriber(topic, clientID, subscriptionName);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java	2008-12-10 10:49:10 UTC (rev 5499)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java	2008-12-10 13:18:52 UTC (rev 5500)
@@ -312,6 +312,43 @@
       session.close();
    }
    
+   public void testCountMessagesWithFilter() throws Exception
+   {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      ClientSession session = sf.createSession(false, true, true);
+
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+
+      session.createQueue(address, queue, null, false, true, true);
+      ClientProducer producer = session.createProducer(address);
+      session.start();
+
+      // send on queue
+      ClientMessage matchingMessage = session.createClientMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      ClientMessage unmatchingMessage = session.createClientMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(matchingMessage);
+      producer.send(unmatchingMessage);
+      producer.send(matchingMessage);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(100);
+      QueueControlMBean queueControl = createQueueControl(address, queue);
+      assertEquals(3, queueControl.getMessageCount());
+
+      assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
+      assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
+
+      session.deleteQueue(queue);
+      session.close();
+   }
+   
    public void testExpireMessagesWithFilter() throws Exception
    {
       SimpleString key = new SimpleString("key");




More information about the jboss-cvs-commits mailing list