Author: clebert.suconic(a)jboss.com
Date: 2011-04-19 13:45:34 -0400 (Tue, 19 Apr 2011)
New Revision: 10534
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
Log:
JBPAPP-6327 - Delete of paged messages through JMSQueueControl
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2011-04-19
15:52:06 UTC (rev 10533)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2011-04-19
17:45:34 UTC (rev 10534)
@@ -79,7 +79,7 @@
*/
public static Filter createFilter(final String filterStr) throws HornetQException
{
- return FilterImpl.createFilter(SimpleString.toSimpleString(filterStr));
+ return FilterImpl.createFilter(SimpleString.toSimpleString(filterStr == null ? null
: filterStr.trim()));
}
/**
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-04-19
15:52:06 UTC (rev 10533)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-04-19
17:45:34 UTC (rev 10534)
@@ -898,8 +898,31 @@
acknowledge(tx, messageReference);
count++;
}
+
+ // System.out.println("QueueMemorySize before depage = " +
queueMemorySize.get());
+ while (pageIterator.hasNext())
+ {
+ PagedReference reference = pageIterator.next();
+ pageIterator.remove();
+ if (filter == null || filter.match(reference.getMessage()))
+ {
+ count++;
+ pageSubscription.ack(reference);
+ }
+ else
+ {
+ addTail(reference, false);
+ }
+ }
+
tx.commit();
+
+
+ if (filter != null && pageIterator != null)
+ {
+ scheduleDepage();
+ }
return count;
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-04-19
15:52:06 UTC (rev 10533)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-04-19
17:45:34 UTC (rev 10534)
@@ -22,6 +22,7 @@
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
@@ -39,11 +40,11 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -1014,7 +1015,112 @@
serverManager.destroyQueue(otherQueueName);
}
+
+ public void testDeleteWithPaging() throws Exception
+ {
+ AddressSettings pagedSetting = new AddressSettings();
+ pagedSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ pagedSetting.setPageSizeBytes(10 * 1024);
+ pagedSetting.setMaxSizeBytes(100 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", pagedSetting);
+
+ serverManager.createQueue(true, "pagedTest", null, true,
"/queue/pagedTest");
+
+ HornetQQueue pagedQueue =
(HornetQQueue)context.lookup("/queue/pagedTest");
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod = session.createProducer(pagedQueue.getAddress());
+
+ ClientMessage msg = session.createMessage(true);
+
+ msg.getBodyBuffer().writeBytes(new byte[90 * 1024]);
+ for (int i = 0 ; i < 100; i++)
+ {
+ prod.send(msg);
+ }
+
+ JMSQueueControl control = createManagementControl(pagedQueue);
+
+ assertEquals(100, control.removeMessages(" "));
+
+
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(pagedQueue.getAddress());
+
+ assertNull(consumer.receive(300));
+
+
+ session.close();
+
+ sf.close();
+ locator.close();
+ }
+
+
+ public void testDeleteWithPagingAndFilter() throws Exception
+ {
+ AddressSettings pagedSetting = new AddressSettings();
+ pagedSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ pagedSetting.setPageSizeBytes(10 * 1024);
+ pagedSetting.setMaxSizeBytes(100 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", pagedSetting);
+
+ serverManager.createQueue(true, "pagedTest", null, true,
"/queue/pagedTest");
+
+ HornetQQueue pagedQueue =
(HornetQQueue)context.lookup("/queue/pagedTest");
+
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod = session.createProducer(pagedQueue.getAddress());
+ for (int i = 0 ; i < 200; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[90 * 1024]);
+ msg.putBooleanProperty("even", i % 2 == 0);
+ prod.send(msg);
+ }
+
+ JMSQueueControl control = createManagementControl(pagedQueue);
+
+ assertEquals(100, control.removeMessages("even=true"));
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(pagedQueue.getAddress());
+
+
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertFalse(msg.getBooleanProperty("even").booleanValue());
+ }
+
+ assertNull(consumer.receive(300));
+
+
+ session.close();
+
+
+ sf.close();
+ locator.close();
+ }
+
public void testMoveMessageToUnknownQueue() throws Exception
{
String unknwonQueue = RandomUtil.randomString();
@@ -1070,7 +1176,7 @@
conf.setJMXManagementEnabled(true);
conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf.setFileDeploymentEnabled(false);
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
server.start();
serverManager = new JMSServerManagerImpl(server);
@@ -1106,9 +1212,14 @@
protected JMSQueueControl createManagementControl() throws Exception
{
- return ManagementControlHelper.createJMSQueueControl(queue, mbeanServer);
+ return createManagementControl(queue);
}
+ protected JMSQueueControl createManagementControl(HornetQQueue queueParameter) throws
Exception
+ {
+ return ManagementControlHelper.createJMSQueueControl(queueParameter, mbeanServer);
+ }
+
// Private -------------------------------------------------------
private Connection createConnection() throws JMSException