Author: jmesnil
Date: 2010-09-22 05:07:31 -0400 (Wed, 22 Sep 2010)
New Revision: 9711
Added:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://jira.jboss.org/browse/HORNETQ-416
* add JMSConsumerInfo and JMSServerControl.listConsumersAsJSON()
* add ServerConsumer.getCreationTime() and isBrowseOnly() methods
Added: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
(rev 0)
+++
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * Helper class to create Java Objects from the
+ * JSON serialization returned by {@link JMSServerControl#listConsumersAsJSON(String)}
and related methods.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class JMSConsumerInfo
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String consumerID;
+
+ private final String connectionID;
+
+ private final String destinationName;
+
+ private final String destinationType;
+
+ private final boolean browseOnly;
+
+ private final long creationTime;
+
+ private final boolean durable;
+
+ private final String filter;
+
+ // Static --------------------------------------------------------
+
+ /**
+ * Returns an array of SubscriptionInfo corresponding to the JSON serialization
returned
+ * by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods.
+ */
+ public static JMSConsumerInfo[] from(final String jsonString) throws Exception
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSConsumerInfo[] infos = new JMSConsumerInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject sub = array.getJSONObject(i);
+ JMSConsumerInfo info = new
JMSConsumerInfo(sub.getString("consumerID"),
+
sub.getString("connectionID"),
+
sub.getString("destinationName"),
+
sub.getString("destinationType"),
+
sub.getBoolean("browseOnly"),
+
sub.getLong("creationTime"),
+ sub.getBoolean("durable"),
+ sub.optString("filter",
null));
+ infos[i] = info;
+ }
+
+ return infos;
+ }
+
+ // Constructors --------------------------------------------------
+
+ private JMSConsumerInfo(final String consumerID,
+ final String connectionID,
+ final String destinationName,
+ final String destinationType,
+ final boolean browseOnly,
+ final long creationTime,
+ final boolean durable,
+ final String filter)
+ {
+ this.consumerID = consumerID;
+ this.connectionID = connectionID;
+ this.destinationName = destinationName;
+ this.destinationType = destinationType;
+ this.browseOnly = browseOnly;
+ this.creationTime = creationTime;
+ this.durable = durable;
+ this.filter = filter;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public String getDestinationName()
+ {
+ return destinationName;
+ }
+
+ public String getDestinationType()
+ {
+ return destinationType;
+ }
+
+ public boolean isBrowseOnly()
+ {
+ return browseOnly;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ /**
+ * @return the durable
+ */
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getFilter()
+ {
+ return filter;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -228,6 +228,13 @@
@Operation(desc = "List all the connection IDs", impact =
MBeanOperationInfo.INFO)
String[] listConnectionIDs() throws Exception;
+ /**
+ * Lists all the connections connected to this server.
+ * The returned String is a JSON string containing an array of JMSConnectionInfo
objects.
+ *
+ * @see JMSConnectionInfo#from(String)
+ */
+ @Operation(desc = "List all JMS connections")
String listConnectionsAsJSON() throws Exception;
/**
@@ -236,5 +243,14 @@
@Operation(desc = "List the sessions for the given connectionID", impact =
MBeanOperationInfo.INFO)
String[] listSessions(@Parameter(desc = "a connection ID", name =
"connectionID") String connectionID) throws Exception;
+ /**
+ * Lists all the consumers which belongs to the JMS Connection specified by the
connectionID.
+ * The returned String is a JSON string containing an array of JMSConsumerInfo
objects.
+ *
+ * @see JMSConsumerInfo#from(String)
+ */
+ @Operation(desc = "List all JMS consumers associated to a JMS Connection")
+ String listConsumersAsJSON(@Parameter(desc = "a connection ID", name =
"connectionID") String connectionID) throws Exception;
+
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java 2010-09-21 22:27:14
UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java 2010-09-22 09:07:31
UTC (rev 9711)
@@ -157,4 +157,6 @@
void close() throws Exception;
boolean isDirectDeliver();
+
+ SimpleString getAddress();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java 2010-09-21
22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -47,6 +47,10 @@
void forceDelivery(long sequence);
void setTransferring(boolean transferring);
+
+ boolean isBrowseOnly();
+
+ long getCreationTime();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-09-21
22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -14,6 +14,7 @@
package org.hornetq.core.server;
import java.util.List;
+import java.util.Set;
import javax.transaction.xa.Xid;
@@ -110,4 +111,6 @@
void close(boolean failed) throws Exception;
void setTransferring(boolean transferring);
+
+ Set<ServerConsumer> getServerConsumers();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-21
22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -238,6 +238,11 @@
{
return name;
}
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
public long getID()
{
Modified:
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -118,6 +118,8 @@
private boolean transferring = false;
+ private final long creationTime;
+
// Constructors
---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -160,6 +162,8 @@
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
+ this.creationTime = System.currentTimeMillis();
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -177,7 +181,17 @@
{
return id;
}
+
+ public boolean isBrowseOnly()
+ {
+ return browseOnly;
+ }
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
Modified:
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -16,6 +16,7 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -232,6 +233,11 @@
{
return remotingConnection.getID();
}
+
+ public Set<ServerConsumer> getServerConsumers() {
+ Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
+ return Collections.unmodifiableSet(consumersClone);
+ }
public void removeConsumer(final long consumerID) throws Exception
{
Modified:
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -36,8 +36,13 @@
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.TopicControl;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.management.impl.MBeanInfoHelper;
+import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.json.JSONArray;
@@ -90,6 +95,38 @@
}
return trimmed;
}
+
+ private static String[] determineJMSDestination(String coreAddress)
+ {
+ String[] result = new String[2]; // destination name & type
+ if (coreAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length());
+ result[1] = "queue";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+ result[1] = "tempqueue";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX.length());
+ result[1] = "topic";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+ result[1] = "temptopic";
+ }
+ else
+ {
+ System.out.println("JMSServerControlImpl.determineJMSDestination()" +
coreAddress);
+ // not related to JMS
+ return null;
+ }
+ return result;
+ }
private static List<Pair<TransportConfiguration, TransportConfiguration>>
convertToConnectorPairs(final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
@@ -598,7 +635,74 @@
blockOnIO();
}
}
-
+
+ public String listConsumersAsJSON(String connectionID) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ JSONArray array = new JSONArray();
+
+ Set<RemotingConnection> connections =
server.getHornetQServer().getRemotingService().getConnections();
+ for (RemotingConnection connection : connections)
+ {
+ if (connectionID.equals(connection.getID().toString()))
+ {
+ List<ServerSession> sessions =
server.getHornetQServer().getSessions(connectionID);
+ for (ServerSession session : sessions)
+ {
+ Set<ServerConsumer> consumers = session.getServerConsumers();
+ for (ServerConsumer consumer : consumers)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("consumerID", consumer.getID());
+ obj.put("connectionID", connectionID);
+ obj.put("queueName",
consumer.getQueue().getName().toString());
+ obj.put("browseOnly", consumer.isBrowseOnly());
+ obj.put("creationTime", consumer.getCreationTime());
+ // JMS consumer with message filter use the queue's filter
+ Filter queueFilter = consumer.getQueue().getFilter();
+ if (queueFilter != null)
+ {
+ obj.put("filter",
queueFilter.getFilterString().toString());
+ }
+ String[] destinationInfo =
determineJMSDestination(consumer.getQueue().getAddress().toString());
+ if (destinationInfo == null)
+ {
+ continue;
+ }
+ obj.put("destinationName", destinationInfo[0]);
+ obj.put("destinationType", destinationInfo[1]);
+ if (destinationInfo[1].equals("topic")) {
+ try
+ {
+
HornetQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString());
+ obj.put("durable", true);
+ } catch (IllegalArgumentException e)
+ {
+ obj.put("durable", false);
+ }
+ }
+ else
+ {
+ obj.put("durable", false);
+ }
+ array.put(obj);
+ }
+ }
+ }
+ }
+ return array.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public String[] listSessions(final String connectionID) throws Exception
{
checkStarted();
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -21,11 +21,19 @@
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSConnectionInfo;
+import org.hornetq.api.jms.management.JMSConsumerInfo;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -36,6 +44,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -150,7 +159,195 @@
{
doListConnectionsAsJSON(InVMAcceptorFactory.class.getName(),
InVMConnectorFactory.class.getName());
}
+
+ public void testListConsumersAsJSON() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createQueue(false, queueName, null, true, queueName);
+ Queue queue = HornetQJMSClient.createQueue(queueName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 =
JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic temporaryTopic = session.createTemporaryTopic();
+
+ // create a regular message consumer
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ String connectionID = infos[0].getConnectionID();
+
+ String consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ JMSConsumerInfo consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+ assertEquals("queue", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(0, consumerInfos.length);
+
+ // create a queue browser
+ QueueBrowser browser = session.createBrowser(queue);
+ // the server resources are created when the browser starts enumerating
+ browser.getEnumeration();
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ System.out.println(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+ assertEquals("queue", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(true, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ browser.close();
+
+ // create a regular consumer w/ filter on a temp topic
+ String filter = "color = 'red'";
+ consumer = session.createConsumer(temporaryTopic, filter);
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ System.out.println(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(temporaryTopic.getTopicName(), consumerInfo.getDestinationName());
+ assertEquals("temptopic", consumerInfo.getDestinationType());
+ assertEquals(filter, consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ connection.close();
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
+ /**
+ * test for durable subscriber
+ */
+ public void testListConsumersAsJSON2() throws Exception
+ {
+ String topicName = RandomUtil.randomString();
+ String clientID = RandomUtil.randomString();
+ String subName = RandomUtil.randomString();
+
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createTopic(false, topicName, topicName);
+ Topic topic = HornetQJMSClient.createTopic(topicName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 =
JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ connection.setClientID(clientID);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // create a durable subscriber
+ MessageConsumer consumer = session.createDurableSubscriber(topic, subName);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ String connectionID = infos[0].getConnectionID();
+
+ String consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ JMSConsumerInfo consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(topic.getTopicName(), consumerInfo.getDestinationName());
+ assertEquals("topic", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(true, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ connection.close();
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyTopic(topicName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -246,7 +443,7 @@
{
assertNotNull(info.getConnectionID());
assertNotNull(info.getClientAddress());
- assertTrue(startTime < info.getCreationTime() &&
info.getCreationTime() < System.currentTimeMillis());
+ assertTrue(startTime <= info.getCreationTime() &&
info.getCreationTime() <= System.currentTimeMillis());
}
ConnectionFactory cf2 = JMSUtil.createFactory(connectorFactory,
@@ -262,7 +459,7 @@
{
assertNotNull(info.getConnectionID());
assertNotNull(info.getClientAddress());
- assertTrue(startTime < info.getCreationTime() &&
info.getCreationTime() < System.currentTimeMillis());
+ assertTrue(startTime <= info.getCreationTime() &&
info.getCreationTime() <= System.currentTimeMillis());
}
connection.close();
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -238,6 +238,11 @@
{
return (String)proxy.invokeOperation("listConnectionsAsJSON");
}
+
+ public String listConsumersAsJSON(String connectionID) throws Exception
+ {
+ return (String)proxy.invokeOperation("listConsumersAsJSON",
connectionID);
+ }
public String[] listRemoteAddresses() throws Exception
{
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-21
22:27:14 UTC (rev 9710)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-22
09:07:31 UTC (rev 9711)
@@ -348,6 +348,12 @@
return name;
}
+ public SimpleString getAddress()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#getID()
*/