Author: jmesnil
Date: 2010-09-21 05:09:50 -0400 (Tue, 21 Sep 2010)
New Revision: 9703
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-416
* add messagesAdded and deliveringCount attribute to JMS DestinationControl
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-09-21
08:53:23 UTC (rev 9702)
+++
branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-09-21
09:09:50 UTC (rev 9703)
@@ -47,6 +47,16 @@
*/
int getMessageCount() throws Exception;
+ /**
+ * Returns the number of messages that this queue is currently delivering to its
consumers.
+ */
+ int getDeliveringCount();
+
+ /**
+ * Returns the number of messages added to this queue since it was created.
+ */
+ long getMessagesAdded();
+
// Operations ----------------------------------------------------
/**
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-09-21
08:53:23 UTC (rev 9702)
+++
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-09-21
09:09:50 UTC (rev 9703)
@@ -51,11 +51,6 @@
void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc =
"Dead-letter address of the queue") String deadLetterAddress) throws Exception;
/**
- * Returns the number of messages added to this queue since it was created.
- */
- long getMessagesAdded();
-
- /**
* Returns the number of scheduled messages in this queue.
*/
long getScheduledCount();
@@ -66,11 +61,6 @@
int getConsumerCount();
/**
- * Returns the number of messages that this queue is currently delivering to its
consumers.
- */
- int getDeliveringCount();
-
- /**
* returns the selector for the queue
*/
String getSelector();
Modified:
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-09-21
08:53:23 UTC (rev 9702)
+++
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-09-21
09:09:50 UTC (rev 9703)
@@ -118,6 +118,28 @@
return getMessageCount(DurabilityType.ALL);
}
+ public int getDeliveringCount()
+ {
+ List<QueueControl> queues = getQueues(DurabilityType.ALL);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getDeliveringCount();
+ }
+ return count;
+ }
+
+ public long getMessagesAdded()
+ {
+ List<QueueControl> queues = getQueues(DurabilityType.ALL);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getMessagesAdded();
+ }
+ return count;
+ }
+
public int getDurableMessageCount()
{
return getMessageCount(DurabilityType.DURABLE);
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-09-21
08:53:23 UTC (rev 9702)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-09-21
09:09:50 UTC (rev 9703)
@@ -77,26 +77,36 @@
}
static MessageConsumer createConsumer(final Connection connection,
+ final Destination destination) throws
JMSException
+ {
+ return createConsumer(connection, destination, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ static MessageConsumer createConsumer(final Connection connection,
final Destination destination,
- final String connectorFactory) throws
JMSException
+ int ackMode) throws JMSException
{
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session s = connection.createSession(false, ackMode);
return s.createConsumer(destination);
}
- public static MessageConsumer createConsumer(final Connection connection, final
Destination destination) throws JMSException
+ static TopicSubscriber createDurableSubscriber(final Connection connection,
+ final Topic topic,
+ final String clientID,
+ final String subscriptionName) throws
JMSException
{
- return JMSUtil.createConsumer(connection, destination,
InVMConnectorFactory.class.getName());
+ return createDurableSubscriber(connection, topic, clientID, subscriptionName,
Session.AUTO_ACKNOWLEDGE);
}
-
+
static TopicSubscriber createDurableSubscriber(final Connection connection,
final Topic topic,
final String clientID,
- final String subscriptionName) throws
JMSException
+ final String subscriptionName,
+ final int ackMode) throws JMSException
{
connection.setClientID(clientID);
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session s = connection.createSession(false, ackMode);
return s.createDurableSubscriber(topic, subscriptionName);
}
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-09-21
08:53:23 UTC (rev 9702)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-09-21
09:09:50 UTC (rev 9703)
@@ -13,15 +13,22 @@
package org.hornetq.tests.integration.jms.server.management;
+import static junit.framework.Assert.assertEquals;
+
import java.util.Map;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import junit.framework.Assert;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.SubscriptionInfo;
import org.hornetq.api.jms.management.TopicControl;
@@ -406,7 +413,77 @@
{
}
}
+
+ public void testGetMessagesAdded() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createConsumer(connection_1, topic);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ TopicControl topicControl = createManagementControl();
+
+ Assert.assertEquals(0, topicControl.getMessagesAdded());
+
+ JMSUtil.sendMessages(topic, 2);
+
+ Assert.assertEquals(3 * 2, topicControl.getMessagesAdded());
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
+ public void testGetMessagesDelivering() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic,
Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+ TopicControl topicControl = createManagementControl();
+
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ connection_1.start();
+ connection_2.start();
+ connection_3.start();
+
+ Message msg_1 = null;
+ Message msg_2 = null;
+ Message msg_3 = null;
+ for (int i = 0; i < 2; i++)
+ {
+ msg_1 = cons_1.receive(5000);
+ assertNotNull(msg_1);
+ msg_2 = cons_2.receive(5000);
+ assertNotNull(msg_2);
+ msg_3 = cons_3.receive(5000);
+ assertNotNull(msg_3);
+ }
+
+ assertEquals(3 * 2, topicControl.getDeliveringCount());
+
+ msg_1.acknowledge();
+ assertEquals(2 * 2, topicControl.getDeliveringCount());
+ msg_2.acknowledge();
+ assertEquals(1 * 2, topicControl.getDeliveringCount());
+ msg_3.acknowledge();
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-09-21
08:53:23 UTC (rev 9702)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-09-21
09:09:50 UTC (rev 9703)
@@ -13,9 +13,12 @@
package org.hornetq.tests.integration.jms.server.management;
+import static junit.framework.Assert.assertEquals;
import static org.hornetq.tests.util.RandomUtil.randomString;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -26,6 +29,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -333,6 +337,71 @@
}
}
+ public void testGetMessagesAdded() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createConsumer(connection_1, topic);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+
+ assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(3 * 2, proxy.retrieveAttributeValue("messagesAdded"));
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
+ public void testGetMessagesDelivering() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic,
Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ connection_1.start();
+ connection_2.start();
+ connection_3.start();
+
+ Message msg_1 = null;
+ Message msg_2 = null;
+ Message msg_3 = null;
+ for (int i = 0; i < 2; i++)
+ {
+ msg_1 = cons_1.receive(5000);
+ assertNotNull(msg_1);
+ msg_2 = cons_2.receive(5000);
+ assertNotNull(msg_2);
+ msg_3 = cons_3.receive(5000);
+ assertNotNull(msg_3);
+ }
+
+ assertEquals(3 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+
+ msg_1.acknowledge();
+ assertEquals(2 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+ msg_2.acknowledge();
+ assertEquals(1 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+ msg_3.acknowledge();
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------