Author: clebert.suconic(a)jboss.com
Date: 2010-12-27 12:51:48 -0500 (Mon, 27 Dec 2010)
New Revision: 10072
Modified:
trunk/src/main/org/hornetq/api/core/management/QueueControl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
Log:
HORNETQ-586 - adding consumer information on listSubscriptions
Modified: trunk/src/main/org/hornetq/api/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/QueueControl.java 2010-12-23 02:11:49
UTC (rev 10071)
+++ trunk/src/main/org/hornetq/api/core/management/QueueControl.java 2010-12-27 17:51:48
UTC (rev 10072)
@@ -281,6 +281,9 @@
*/
@Operation(desc = "Resumes delivery of queued messages and gets the queue out of
paused state.", impact = MBeanOperationInfo.ACTION)
void resume() throws Exception;
+
+ @Operation(desc = "List all the existent consumers on the Queue")
+ String listConsumersAsJSON() throws Exception;
/**
* Returns whether the queue is paused.
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-12-23
02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-12-27
17:51:48 UTC (rev 10072)
@@ -14,6 +14,7 @@
package org.hornetq.core.management.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -33,8 +34,10 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.utils.json.JSONArray;
@@ -328,7 +331,7 @@
public void setExpiryAddress(final String expiryAddress) throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -352,7 +355,7 @@
public Map<String, Object>[] listScheduledMessages() throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -375,7 +378,7 @@
public String listScheduledMessagesAsJSON() throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -390,12 +393,12 @@
public Map<String, Object>[] listMessages(final String filterStr) throws
Exception
{
checkStarted();
-
+
clearIO();
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<Map<String, Object>> messages = new
ArrayList<Map<String,Object>>();
+ List<Map<String, Object>> messages = new ArrayList<Map<String,
Object>>();
queue.blockOnExecutorFuture();
Iterator<MessageReference> iterator = queue.iterator();
while (iterator.hasNext())
@@ -422,7 +425,7 @@
public String listMessagesAsJSON(final String filter) throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -437,7 +440,7 @@
public long countMessages(final String filterStr) throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -455,7 +458,7 @@
MessageReference ref = (MessageReference)iterator.next();
if (filter.match(ref.getMessage()))
{
- count ++;
+ count++;
}
}
return count;
@@ -470,7 +473,7 @@
public boolean removeMessage(final long messageID) throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -521,7 +524,7 @@
public int expireMessages(final String filterStr) throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -541,7 +544,7 @@
public boolean moveMessage(final long messageID, final String otherQueueName) throws
Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -645,7 +648,7 @@
public boolean changeMessagePriority(final long messageID, final int newPriority)
throws Exception
{
checkStarted();
-
+
clearIO();
try
{
@@ -665,7 +668,7 @@
public String listMessageCounter()
{
checkStarted();
-
+
clearIO();
try
{
@@ -684,7 +687,7 @@
public void resetMessageCounter()
{
checkStarted();
-
+
clearIO();
try
{
@@ -699,7 +702,7 @@
public String listMessageCounterAsHTML()
{
checkStarted();
-
+
clearIO();
try
{
@@ -786,13 +789,52 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.management.QueueControl#listConsumersAsJSON()
+ */
+ public String listConsumersAsJSON() throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+ try
+ {
+ Collection<Consumer> consumers = queue.getConsumers();
+
+ JSONArray jsonArray = new JSONArray();
+
+ for (Consumer consumer : consumers)
+ {
+
+ if (consumer instanceof ServerConsumer)
+ {
+ ServerConsumer serverConsumer = (ServerConsumer)consumer;
+
+ JSONObject obj = new JSONObject();
+ obj.put("consumerID", serverConsumer.getID());
+ obj.put("connectionID",
serverConsumer.getConnectionID().toString());
+ obj.put("browseOnly", serverConsumer.isBrowseOnly());
+ obj.put("creationTime", serverConsumer.getCreationTime());
+
+ jsonArray.put(obj);
+ }
+
+ }
+
+ return jsonArray.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
@Override
MBeanOperationInfo[] fillMBeanOperationInfo()
{
return MBeanInfoHelper.getMBeanOperationsInfo(QueueControl.class);
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -806,6 +848,6 @@
throw new IllegalStateException("HornetQ Server is not started. Queue can
not be managed yet");
}
}
-
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-12-23 02:11:49 UTC
(rev 10071)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-12-27 17:51:48 UTC
(rev 10072)
@@ -27,6 +27,8 @@
public interface ServerConsumer extends Consumer
{
long getID();
+
+ Object getConnectionID();
void close(boolean failed) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-12-23
02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-12-27
17:51:48 UTC (rev 10072)
@@ -201,6 +201,11 @@
{
return creationTime;
}
+
+ public String getConnectionID()
+ {
+ return this.session.getConnectionID().toString();
+ }
public HandleStatus handle(final MessageReference ref) throws Exception
{
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-12-23
02:11:49 UTC (rev 10071)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-12-27
17:51:48 UTC (rev 10072)
@@ -57,7 +57,7 @@
private final AddressControl addressControl;
private final ManagementService managementService;
-
+
private final JMSServerManager jmsServerManager;
// Static --------------------------------------------------------
@@ -83,7 +83,7 @@
}
// TopicControlMBean implementation ------------------------------
-
+
/* (non-Javadoc)
* @see org.hornetq.api.jms.management.JMSQueueControl#addJNDI(java.lang.String)
*/
@@ -91,13 +91,12 @@
{
jmsServerManager.addTopicToJndi(managedTopic.getName(), jndi);
}
-
+
public String[] getJNDIBindings()
{
- return jmsServerManager.getJNDIOnTopic(managedTopic.getName());
+ return jmsServerManager.getJNDIOnTopic(managedTopic.getName());
}
-
public String getName()
{
return managedTopic.getName();
@@ -128,7 +127,7 @@
}
return count;
}
-
+
public long getMessagesAdded()
{
List<QueueControl> queues = getQueues(DurabilityType.ALL);
@@ -139,7 +138,7 @@
}
return count;
}
-
+
public int getDurableMessageCount()
{
return getMessageCount(DurabilityType.DURABLE);
@@ -291,7 +290,7 @@
if (queue.isDurable())
{
Pair<String, String> pair =
HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
-
.toString());
+
.toString());
clientID = pair.a;
subName = pair.b;
}
@@ -312,36 +311,46 @@
private String listSubscribersInfosAsJSON(final DurabilityType durability) throws
Exception
{
- List<QueueControl> queues = getQueues(durability);
- JSONArray array = new JSONArray();
-
- for (QueueControl queue : queues)
+ try
{
- String clientID = null;
- String subName = null;
+ List<QueueControl> queues = getQueues(durability);
+ JSONArray array = new JSONArray();
- if (queue.isDurable())
+ for (QueueControl queue : queues)
{
- Pair<String, String> pair =
HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
-
.toString());
- clientID = pair.a;
- subName = pair.b;
+ String clientID = null;
+ String subName = null;
+
+ if (queue.isDurable())
+ {
+ Pair<String, String> pair =
HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
+
.toString());
+ clientID = pair.a;
+ subName = pair.b;
+ }
+
+ String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+ JSONObject info = new JSONObject();
+
+ info.put("queueName", queue.getName());
+ info.put("clientID", clientID);
+ info.put("selector", filter);
+ info.put("name", subName);
+ info.put("durable", queue.isDurable());
+ info.put("messageCount", queue.getMessageCount());
+ info.put("deliveringCount", queue.getDeliveringCount());
+ info.put("consumers", new JSONArray(queue.listConsumersAsJSON())
);
+ array.put(info);
}
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- JSONObject info = new JSONObject();
- info.put("queueName", queue.getName());
- info.put("clientID", clientID);
- info.put("selector", filter);
- info.put("name", subName);
- info.put("durable", queue.isDurable());
- info.put("messageCount", queue.getMessageCount());
- info.put("deliveringCount", queue.getDeliveringCount());
- array.put(info);
+ return array.toString();
}
-
- return array.toString();
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return e.toString();
+ }
}
private int getMessageCount(final DurabilityType durability)
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-12-23
02:11:49 UTC (rev 10071)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-12-27
17:51:48 UTC (rev 10072)
@@ -13,8 +13,6 @@
package org.hornetq.tests.integration.jms.server.management;
-import static junit.framework.Assert.assertEquals;
-
import java.util.Map;
import javax.jms.Connection;
@@ -25,10 +23,7 @@
import junit.framework.Assert;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.SubscriptionInfo;
import org.hornetq.api.jms.management.TopicControl;
@@ -145,16 +140,22 @@
{
// 1 non-durable subscriber, 2 durable subscribers
Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createConsumer(connection_1, topic);
+ MessageConsumer cons = JMSUtil.createConsumer(connection_1, topic);
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
Assert.assertEquals(1, topicControl.listNonDurableSubscriptions().length);
Assert.assertEquals(2, topicControl.listDurableSubscriptions().length);
+
+ String json = topicControl.listAllSubscriptionsAsJSON();
+ System.out.println("Json: " + json);
+ JSONArray jsonArray = new JSONArray(json);
+
+ assertEquals(3, jsonArray.length());
connection_1.close();
connection_2.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2010-12-23
02:11:49 UTC (rev 10071)
+++
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2010-12-27
17:51:48 UTC (rev 10072)
@@ -36,6 +36,7 @@
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
/**
* A QueueControlTest
@@ -53,6 +54,7 @@
protected HornetQServer server;
protected ClientSession session;
+
private ServerLocator locator;
// Static --------------------------------------------------------
@@ -196,6 +198,37 @@
session.deleteQueue(queue);
}
+ public void testGetConsumerJSON() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+
+ Assert.assertEquals(0, queueControl.getConsumerCount());
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ Assert.assertEquals(1, queueControl.getConsumerCount());
+
+
+ System.out.println("Consumers: " + queueControl.listConsumersAsJSON());
+
+ JSONArray obj = new JSONArray(queueControl.listConsumersAsJSON());
+
+ assertEquals(1, obj.length());
+
+ consumer.close();
+ Assert.assertEquals(0, queueControl.getConsumerCount());
+
+ obj = new JSONArray(queueControl.listConsumersAsJSON());
+
+ assertEquals(0, obj.length());
+
+ session.deleteQueue(queue);
+ }
+
public void testGetMessageCount() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
@@ -374,7 +407,7 @@
ClientMessage message = session.createMessage(false);
message.putIntProperty(new SimpleString("key"), intValue);
producer.send(message);
-
+
String jsonString = queueControl.listMessagesAsJSON(null);
Assert.assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-12-23
02:11:49 UTC (rev 10071)
+++
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-12-27
17:51:48 UTC (rev 10072)
@@ -261,6 +261,11 @@
return (Boolean)proxy.invokeOperation("isPaused");
}
+ public String listConsumersAsJSON() throws Exception
+ {
+ return (String)proxy.invokeOperation("listConsumersAsJSON");
+ }
+
};
}