[hornetq-commits] JBoss hornetq SVN: r9703 - in branches/hornetq-416: src/main/org/hornetq/jms/management/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 21 05:09:50 EDT 2010


Author: jmesnil
Date: 2010-09-21 05:09:50 -0400 (Tue, 21 Sep 2010)
New Revision: 9703

Modified:
   branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java
   branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
   branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
   branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
   branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
   branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-416

* add messagesAdded and deliveringCount attribute to JMS DestinationControl

Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java	2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java	2010-09-21 09:09:50 UTC (rev 9703)
@@ -47,6 +47,16 @@
     */
    int getMessageCount() throws Exception;
 
+   /**
+    * Returns the number of messages that this queue is currently delivering to its consumers.
+    */
+   int getDeliveringCount();
+
+   /**
+    * Returns the number of messages added to this queue since it was created.
+    */
+   long getMessagesAdded();
+
    // Operations ----------------------------------------------------
 
    /**

Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java	2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java	2010-09-21 09:09:50 UTC (rev 9703)
@@ -51,11 +51,6 @@
    void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc = "Dead-letter address of the queue") String deadLetterAddress) throws Exception;
 
    /**
-    * Returns the number of messages added to this queue since it was created.
-    */
-   long getMessagesAdded();
-
-   /**
     * Returns the number of scheduled messages in this queue.
     */
    long getScheduledCount();
@@ -66,11 +61,6 @@
    int getConsumerCount();
 
    /**
-    * Returns the number of messages that this queue is currently delivering to its consumers.
-    */
-   int getDeliveringCount();
-
-   /**
     * returns the selector for the queue
     */
    String getSelector();

Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java	2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java	2010-09-21 09:09:50 UTC (rev 9703)
@@ -118,6 +118,28 @@
       return getMessageCount(DurabilityType.ALL);
    }
 
+   public int getDeliveringCount()
+   {
+      List<QueueControl> queues = getQueues(DurabilityType.ALL);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getDeliveringCount();
+      }
+      return count;
+   }
+   
+   public long getMessagesAdded()
+   {
+      List<QueueControl> queues = getQueues(DurabilityType.ALL);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getMessagesAdded();
+      }
+      return count;
+   }
+   
    public int getDurableMessageCount()
    {
       return getMessageCount(DurabilityType.DURABLE);

Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-09-21 09:09:50 UTC (rev 9703)
@@ -77,26 +77,36 @@
    }
 
    static MessageConsumer createConsumer(final Connection connection,
+                                         final Destination destination) throws JMSException
+   {
+      return createConsumer(connection, destination, Session.AUTO_ACKNOWLEDGE);
+   }
+   
+   static MessageConsumer createConsumer(final Connection connection,
                                          final Destination destination,
-                                         final String connectorFactory) throws JMSException
+                                         int ackMode) throws JMSException
    {
-      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session s = connection.createSession(false, ackMode);
 
       return s.createConsumer(destination);
    }
 
-   public static MessageConsumer createConsumer(final Connection connection, final Destination destination) throws JMSException
+   static TopicSubscriber createDurableSubscriber(final Connection connection,
+                                                  final Topic topic,
+                                                  final String clientID,
+                                                  final String subscriptionName) throws JMSException
    {
-      return JMSUtil.createConsumer(connection, destination, InVMConnectorFactory.class.getName());
+      return createDurableSubscriber(connection, topic, clientID, subscriptionName, Session.AUTO_ACKNOWLEDGE);
    }
-
+   
    static TopicSubscriber createDurableSubscriber(final Connection connection,
                                                   final Topic topic,
                                                   final String clientID,
-                                                  final String subscriptionName) throws JMSException
+                                                  final String subscriptionName,
+                                                  final int ackMode) throws JMSException
    {
       connection.setClientID(clientID);
-      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session s = connection.createSession(false, ackMode);
 
       return s.createDurableSubscriber(topic, subscriptionName);
    }

Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2010-09-21 09:09:50 UTC (rev 9703)
@@ -13,15 +13,22 @@
 
 package org.hornetq.tests.integration.jms.server.management;
 
+import static junit.framework.Assert.assertEquals;
+
 import java.util.Map;
 
 import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.jms.TopicSubscriber;
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.management.SubscriptionInfo;
 import org.hornetq.api.jms.management.TopicControl;
@@ -406,7 +413,77 @@
       {
       }
    }
+   
+   public void testGetMessagesAdded() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createConsumer(connection_1, topic);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
 
+      TopicControl topicControl = createManagementControl();
+
+      Assert.assertEquals(0, topicControl.getMessagesAdded());
+
+      JMSUtil.sendMessages(topic, 2);
+
+      Assert.assertEquals(3 * 2, topicControl.getMessagesAdded());
+
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
+   
+   public void testGetMessagesDelivering() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+      TopicControl topicControl = createManagementControl();
+
+      assertEquals(0, topicControl.getDeliveringCount());
+
+      JMSUtil.sendMessages(topic, 2);
+
+      assertEquals(0, topicControl.getDeliveringCount());
+      
+      connection_1.start();
+      connection_2.start();
+      connection_3.start();
+
+      Message msg_1 = null;
+      Message msg_2 = null;
+      Message msg_3 = null;
+      for (int i = 0; i < 2; i++)
+      {
+         msg_1 = cons_1.receive(5000);
+         assertNotNull(msg_1);
+         msg_2 = cons_2.receive(5000);
+         assertNotNull(msg_2);
+         msg_3 = cons_3.receive(5000);         
+         assertNotNull(msg_3);
+      }
+
+      assertEquals(3 * 2, topicControl.getDeliveringCount());
+
+      msg_1.acknowledge();
+      assertEquals(2 * 2, topicControl.getDeliveringCount());
+      msg_2.acknowledge();
+      assertEquals(1 * 2, topicControl.getDeliveringCount());
+      msg_3.acknowledge();
+      assertEquals(0, topicControl.getDeliveringCount());
+      
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java	2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java	2010-09-21 09:09:50 UTC (rev 9703)
@@ -13,9 +13,12 @@
 
 package org.hornetq.tests.integration.jms.server.management;
 
+import static junit.framework.Assert.assertEquals;
 import static org.hornetq.tests.util.RandomUtil.randomString;
 
 import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -26,6 +29,7 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.ResourceNames;
 import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.TopicControl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -333,6 +337,71 @@
       }
    }
 
+   public void testGetMessagesAdded() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createConsumer(connection_1, topic);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+
+      assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
+
+      JMSUtil.sendMessages(topic, 2);
+
+      assertEquals(3 * 2, proxy.retrieveAttributeValue("messagesAdded"));
+
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
+   
+   public void testGetMessagesDelivering() throws Exception
+   {
+      Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+      Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+      MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+      assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+      JMSUtil.sendMessages(topic, 2);
+
+      assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+      
+      connection_1.start();
+      connection_2.start();
+      connection_3.start();
+
+      Message msg_1 = null;
+      Message msg_2 = null;
+      Message msg_3 = null;
+      for (int i = 0; i < 2; i++)
+      {
+         msg_1 = cons_1.receive(5000);
+         assertNotNull(msg_1);
+         msg_2 = cons_2.receive(5000);
+         assertNotNull(msg_2);
+         msg_3 = cons_3.receive(5000);         
+         assertNotNull(msg_3);
+      }
+
+      assertEquals(3 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+
+      msg_1.acknowledge();
+      assertEquals(2 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+      msg_2.acknowledge();
+      assertEquals(1 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+      msg_3.acknowledge();
+      assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+      
+      connection_1.close();
+      connection_2.close();
+      connection_3.close();
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list