Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 09:39:41 -0400 (Tue, 21 Jun 2011)
New Revision: 10864
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
Adding a test
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-06-21
10:13:05 UTC (rev 10863)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-06-21
13:39:41 UTC (rev 10864)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.management;
+import java.util.LinkedList;
import java.util.Map;
import junit.framework.Assert;
@@ -888,6 +889,55 @@
session.deleteQueue(queue);
}
+ public void testRemoveMessage2() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ // send messages on queue
+
+ for (int i = 0 ; i < 100; i++)
+ {
+
+ ClientMessage msg = session.createMessage(false);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+ }
+
+ ClientConsumer cons = session.createConsumer(queue);
+ session.start();
+ LinkedList<ClientMessage> msgs = new LinkedList<ClientMessage>();
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage msg = cons.receive(1000);
+ msgs.add(msg);
+ }
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(100, queueControl.getMessageCount());
+
+ // the message IDs are set on the server
+ Map<String, Object>[] messages = queueControl.listMessages(null);
+ Assert.assertEquals(50, messages.length);
+ assertEquals(50, ((Integer)messages[0].get("count")).intValue());
+ long messageID = (Long)messages[0].get("messageID");
+
+ // delete 1st message
+ boolean deleted = queueControl.removeMessage(messageID);
+ Assert.assertTrue(deleted);
+ Assert.assertEquals(99, queueControl.getMessageCount());
+
+ cons.close();
+
+ // check there is a single message to consume from queue
+ ManagementTestBase.consumeMessages(99, session, queue);
+
+ session.deleteQueue(queue);
+ }
+
public void testCountMessagesWithFilter() throws Exception
{
SimpleString key = new SimpleString("key");
@@ -1477,6 +1527,7 @@
locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnNonDurableSend(true);
+ locator.setConsumerWindowSize(0);
ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(false, true, false);
session.start();