[hornetq-commits] JBoss hornetq SVN: r10534 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Apr 19 13:45:35 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-19 13:45:34 -0400 (Tue, 19 Apr 2011)
New Revision: 10534

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
Log:
JBPAPP-6327 - Delete of paged messages through JMSQueueControl

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java	2011-04-19 15:52:06 UTC (rev 10533)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/filter/impl/FilterImpl.java	2011-04-19 17:45:34 UTC (rev 10534)
@@ -79,7 +79,7 @@
     */
    public static Filter createFilter(final String filterStr) throws HornetQException
    {
-      return FilterImpl.createFilter(SimpleString.toSimpleString(filterStr));
+      return FilterImpl.createFilter(SimpleString.toSimpleString(filterStr == null ? null : filterStr.trim()));
    }
 
    /**

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-04-19 15:52:06 UTC (rev 10533)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-04-19 17:45:34 UTC (rev 10534)
@@ -898,8 +898,31 @@
             acknowledge(tx, messageReference);
             count++;
          }
+         
+         // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+         while (pageIterator.hasNext())
+         {
+            PagedReference reference = pageIterator.next();
+            pageIterator.remove();
 
+            if (filter == null || filter.match(reference.getMessage()))
+            {
+               count++;
+               pageSubscription.ack(reference);
+            }
+            else
+            {
+               addTail(reference, false);
+            }
+         }
+
          tx.commit();
+         
+         
+         if (filter != null && pageIterator != null)
+         {
+            scheduleDepage();
+         }
 
          return count;
       }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java	2011-04-19 15:52:06 UTC (rev 10533)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java	2011-04-19 17:45:34 UTC (rev 10534)
@@ -22,6 +22,7 @@
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.naming.Context;
 
@@ -39,11 +40,11 @@
 import org.hornetq.api.jms.JMSFactoryType;
 import org.hornetq.api.jms.management.JMSQueueControl;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.client.HornetQDestination;
@@ -1014,7 +1015,112 @@
 
       serverManager.destroyQueue(otherQueueName);
    }
+   
+   public void testDeleteWithPaging() throws Exception
+   {
+      AddressSettings pagedSetting = new AddressSettings();
+      pagedSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      pagedSetting.setPageSizeBytes(10 * 1024);
+      pagedSetting.setMaxSizeBytes(100 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", pagedSetting);
+      
+      serverManager.createQueue(true, "pagedTest", null, true, "/queue/pagedTest");
+      
+      HornetQQueue pagedQueue = (HornetQQueue)context.lookup("/queue/pagedTest");
+      
 
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+      
+      ClientProducer prod = session.createProducer(pagedQueue.getAddress());
+      
+      ClientMessage msg = session.createMessage(true);
+      
+      msg.getBodyBuffer().writeBytes(new byte[90 * 1024]);
+      for (int i = 0 ; i < 100; i++)
+      {
+         prod.send(msg);
+      }
+      
+      JMSQueueControl control = createManagementControl(pagedQueue);
+      
+      assertEquals(100, control.removeMessages("     "));
+      
+      
+      
+      session.start();
+      
+      ClientConsumer consumer = session.createConsumer(pagedQueue.getAddress());
+      
+      assertNull(consumer.receive(300));
+
+      
+      session.close();
+
+      sf.close();
+      locator.close();
+   }
+
+   
+   public void testDeleteWithPagingAndFilter() throws Exception
+   {
+      AddressSettings pagedSetting = new AddressSettings();
+      pagedSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      pagedSetting.setPageSizeBytes(10 * 1024);
+      pagedSetting.setMaxSizeBytes(100 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", pagedSetting);
+      
+      serverManager.createQueue(true, "pagedTest", null, true, "/queue/pagedTest");
+      
+      HornetQQueue pagedQueue = (HornetQQueue)context.lookup("/queue/pagedTest");
+      
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+      
+      ClientProducer prod = session.createProducer(pagedQueue.getAddress());
+      for (int i = 0 ; i < 200; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+         msg.getBodyBuffer().writeBytes(new byte[90 * 1024]);
+         msg.putBooleanProperty("even", i % 2 == 0);
+         prod.send(msg);
+      }
+      
+      JMSQueueControl control = createManagementControl(pagedQueue);
+      
+      assertEquals(100, control.removeMessages("even=true"));
+      
+      session.start();
+      
+      ClientConsumer consumer = session.createConsumer(pagedQueue.getAddress());
+      
+      
+      
+      for (int i = 0 ; i < 100; i++)
+      {
+         ClientMessage msg = consumer.receive(1000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         assertFalse(msg.getBooleanProperty("even").booleanValue());
+      }
+      
+      assertNull(consumer.receive(300));
+
+      
+      session.close();
+
+
+      sf.close();
+      locator.close();
+   }
+
    public void testMoveMessageToUnknownQueue() throws Exception
    {
       String unknwonQueue = RandomUtil.randomString();
@@ -1070,7 +1176,7 @@
       conf.setJMXManagementEnabled(true);
       conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
       conf.setFileDeploymentEnabled(false);
-      server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+      server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
       server.start();
 
       serverManager = new JMSServerManagerImpl(server);
@@ -1106,9 +1212,14 @@
 
    protected JMSQueueControl createManagementControl() throws Exception
    {
-      return ManagementControlHelper.createJMSQueueControl(queue, mbeanServer);
+      return createManagementControl(queue);
    }
 
+   protected JMSQueueControl createManagementControl(HornetQQueue queueParameter) throws Exception
+   {
+      return ManagementControlHelper.createJMSQueueControl(queueParameter, mbeanServer);
+   }
+
    // Private -------------------------------------------------------
 
    private Connection createConnection() throws JMSException



More information about the hornetq-commits mailing list