[hornetq-commits] JBoss hornetq SVN: r10072 - in trunk: src/main/org/hornetq/core/management/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 27 12:51:48 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-27 12:51:48 -0500 (Mon, 27 Dec 2010)
New Revision: 10072

Modified:
   trunk/src/main/org/hornetq/api/core/management/QueueControl.java
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/server/ServerConsumer.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
Log:
HORNETQ-586 - adding consumer information on listSubscriptions

Modified: trunk/src/main/org/hornetq/api/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/QueueControl.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/api/core/management/QueueControl.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -281,6 +281,9 @@
     */
    @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = MBeanOperationInfo.ACTION)
    void resume() throws Exception;
+   
+   @Operation(desc = "List all the existent consumers on the Queue")
+   String listConsumersAsJSON() throws Exception;
 
    /**
     * Returns whether the queue is paused.

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -14,6 +14,7 @@
 package org.hornetq.core.management.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -33,8 +34,10 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.utils.json.JSONArray;
@@ -328,7 +331,7 @@
    public void setExpiryAddress(final String expiryAddress) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -352,7 +355,7 @@
    public Map<String, Object>[] listScheduledMessages() throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -375,7 +378,7 @@
    public String listScheduledMessagesAsJSON() throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -390,12 +393,12 @@
    public Map<String, Object>[] listMessages(final String filterStr) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
          Filter filter = FilterImpl.createFilter(filterStr);
-         List<Map<String, Object>> messages = new ArrayList<Map<String,Object>>();
+         List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
          queue.blockOnExecutorFuture();
          Iterator<MessageReference> iterator = queue.iterator();
          while (iterator.hasNext())
@@ -422,7 +425,7 @@
    public String listMessagesAsJSON(final String filter) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -437,7 +440,7 @@
    public long countMessages(final String filterStr) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -455,7 +458,7 @@
                MessageReference ref = (MessageReference)iterator.next();
                if (filter.match(ref.getMessage()))
                {
-                  count ++;
+                  count++;
                }
             }
             return count;
@@ -470,7 +473,7 @@
    public boolean removeMessage(final long messageID) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -521,7 +524,7 @@
    public int expireMessages(final String filterStr) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -541,7 +544,7 @@
    public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -645,7 +648,7 @@
    public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -665,7 +668,7 @@
    public String listMessageCounter()
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -684,7 +687,7 @@
    public void resetMessageCounter()
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -699,7 +702,7 @@
    public String listMessageCounterAsHTML()
    {
       checkStarted();
-      
+
       clearIO();
       try
       {
@@ -786,13 +789,52 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.management.QueueControl#listConsumersAsJSON()
+    */
+   public String listConsumersAsJSON() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+      try
+      {
+         Collection<Consumer> consumers = queue.getConsumers();
+
+         JSONArray jsonArray = new JSONArray();
+
+         for (Consumer consumer : consumers)
+         {
+
+            if (consumer instanceof ServerConsumer)
+            {
+               ServerConsumer serverConsumer = (ServerConsumer)consumer;
+
+               JSONObject obj = new JSONObject();
+               obj.put("consumerID", serverConsumer.getID());
+               obj.put("connectionID", serverConsumer.getConnectionID().toString());
+               obj.put("browseOnly", serverConsumer.isBrowseOnly());
+               obj.put("creationTime", serverConsumer.getCreationTime());
+               
+               jsonArray.put(obj);
+            }
+
+         }
+
+         return jsonArray.toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
    @Override
    MBeanOperationInfo[] fillMBeanOperationInfo()
    {
       return MBeanInfoHelper.getMBeanOperationsInfo(QueueControl.class);
    }
 
-   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -806,6 +848,6 @@
          throw new IllegalStateException("HornetQ Server is not started. Queue can not be managed yet");
       }
    }
-   
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -27,6 +27,8 @@
 public interface ServerConsumer extends Consumer
 {
    long getID();
+   
+   Object getConnectionID();
 
    void close(boolean failed) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -201,6 +201,11 @@
    {
       return creationTime;
    }
+   
+   public String getConnectionID()
+   {
+      return this.session.getConnectionID().toString();
+   }
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {

Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -57,7 +57,7 @@
    private final AddressControl addressControl;
 
    private final ManagementService managementService;
-   
+
    private final JMSServerManager jmsServerManager;
 
    // Static --------------------------------------------------------
@@ -83,7 +83,7 @@
    }
 
    // TopicControlMBean implementation ------------------------------
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.api.jms.management.JMSQueueControl#addJNDI(java.lang.String)
     */
@@ -91,13 +91,12 @@
    {
       jmsServerManager.addTopicToJndi(managedTopic.getName(), jndi);
    }
-   
+
    public String[] getJNDIBindings()
    {
-       return jmsServerManager.getJNDIOnTopic(managedTopic.getName());
+      return jmsServerManager.getJNDIOnTopic(managedTopic.getName());
    }
 
-
    public String getName()
    {
       return managedTopic.getName();
@@ -128,7 +127,7 @@
       }
       return count;
    }
-   
+
    public long getMessagesAdded()
    {
       List<QueueControl> queues = getQueues(DurabilityType.ALL);
@@ -139,7 +138,7 @@
       }
       return count;
    }
-   
+
    public int getDurableMessageCount()
    {
       return getMessageCount(DurabilityType.DURABLE);
@@ -291,7 +290,7 @@
          if (queue.isDurable())
          {
             Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
-                                                                                                   .toString());
+                                                                                                         .toString());
             clientID = pair.a;
             subName = pair.b;
          }
@@ -312,36 +311,46 @@
 
    private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
    {
-      List<QueueControl> queues = getQueues(durability);
-      JSONArray array = new JSONArray();
-
-      for (QueueControl queue : queues)
+      try
       {
-         String clientID = null;
-         String subName = null;
+         List<QueueControl> queues = getQueues(durability);
+         JSONArray array = new JSONArray();
 
-         if (queue.isDurable())
+         for (QueueControl queue : queues)
          {
-            Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
-                                                                                                   .toString());
-            clientID = pair.a;
-            subName = pair.b;
+            String clientID = null;
+            String subName = null;
+
+            if (queue.isDurable())
+            {
+               Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
+                                                                                                            .toString());
+               clientID = pair.a;
+               subName = pair.b;
+            }
+
+            String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+            JSONObject info = new JSONObject();
+
+            info.put("queueName", queue.getName());
+            info.put("clientID", clientID);
+            info.put("selector", filter);
+            info.put("name", subName);
+            info.put("durable", queue.isDurable());
+            info.put("messageCount", queue.getMessageCount());
+            info.put("deliveringCount", queue.getDeliveringCount());
+            info.put("consumers", new JSONArray(queue.listConsumersAsJSON()) );
+            array.put(info);
          }
 
-         String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
-         JSONObject info = new JSONObject();
-         info.put("queueName", queue.getName());
-         info.put("clientID", clientID);
-         info.put("selector", filter);
-         info.put("name", subName);
-         info.put("durable", queue.isDurable());
-         info.put("messageCount", queue.getMessageCount());
-         info.put("deliveringCount", queue.getDeliveringCount());
-         array.put(info);
+         return array.toString();
       }
-
-      return array.toString();
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return e.toString();
+      }
    }
 
    private int getMessageCount(final DurabilityType durability)

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -13,8 +13,6 @@
 
 package org.hornetq.tests.integration.jms.server.management;
 
-import static junit.framework.Assert.assertEquals;
-
 import java.util.Map;
 
 import javax.jms.Connection;
@@ -25,10 +23,7 @@
 
 import junit.framework.Assert;
 
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.management.SubscriptionInfo;
 import org.hornetq.api.jms.management.TopicControl;
@@ -145,16 +140,22 @@
    {
       // 1 non-durable subscriber, 2 durable subscribers
       Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createConsumer(connection_1, topic);
+      MessageConsumer cons = JMSUtil.createConsumer(connection_1, topic);
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+      TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+      TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
 
       TopicControl topicControl = createManagementControl();
       Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
       Assert.assertEquals(1, topicControl.listNonDurableSubscriptions().length);
       Assert.assertEquals(2, topicControl.listDurableSubscriptions().length);
+      
+      String json = topicControl.listAllSubscriptionsAsJSON();
+      System.out.println("Json: " + json);
+      JSONArray jsonArray = new JSONArray(json);
+      
+      assertEquals(3, jsonArray.length());
 
       connection_1.close();
       connection_2.close();

Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -36,6 +36,7 @@
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
 
 /**
  * A QueueControlTest
@@ -53,6 +54,7 @@
    protected HornetQServer server;
 
    protected ClientSession session;
+
    private ServerLocator locator;
 
    // Static --------------------------------------------------------
@@ -196,6 +198,37 @@
       session.deleteQueue(queue);
    }
 
+   public void testGetConsumerJSON() throws Exception
+   {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      Assert.assertEquals(0, queueControl.getConsumerCount());
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      Assert.assertEquals(1, queueControl.getConsumerCount());
+
+      
+      System.out.println("Consumers: " + queueControl.listConsumersAsJSON());
+      
+      JSONArray obj = new JSONArray(queueControl.listConsumersAsJSON());
+
+      assertEquals(1, obj.length());
+
+      consumer.close();
+      Assert.assertEquals(0, queueControl.getConsumerCount());
+
+      obj = new JSONArray(queueControl.listConsumersAsJSON());
+
+      assertEquals(0, obj.length());
+
+      session.deleteQueue(queue);
+   }
+
    public void testGetMessageCount() throws Exception
    {
       SimpleString address = RandomUtil.randomSimpleString();
@@ -374,7 +407,7 @@
       ClientMessage message = session.createMessage(false);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
-      
+
       String jsonString = queueControl.listMessagesAsJSON(null);
       Assert.assertNotNull(jsonString);
       JSONArray array = new JSONArray(jsonString);

Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2010-12-23 02:11:49 UTC (rev 10071)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2010-12-27 17:51:48 UTC (rev 10072)
@@ -261,6 +261,11 @@
             return (Boolean)proxy.invokeOperation("isPaused");
          }
 
+         public String listConsumersAsJSON() throws Exception
+         {
+            return (String)proxy.invokeOperation("listConsumersAsJSON");
+         }
+
       };
    }
 



More information about the hornetq-commits mailing list