[hornetq-commits] JBoss hornetq SVN: r8786 - in trunk: src/main/org/hornetq/core/server and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 11 07:15:25 EST 2010


Author: jmesnil
Date: 2010-01-11 07:15:25 -0500 (Mon, 11 Jan 2010)
New Revision: 8786

Modified:
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-265: remove Queue#list() synchronized method

* removed Queue#list(Filter) method and use Queue#iterator() instead
* added changeReferencesPriority() and sendMessagesToDeadLetterAddress() methods to Queue
  to perform operations on messages matching a filter in a single iteration

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2010-01-11 12:15:25 UTC (rev 8786)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.management.impl;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -359,15 +361,18 @@
       try
       {
          Filter filter = FilterImpl.createFilter(filterStr);
-         List<MessageReference> refs = queue.list(filter);
-         Map<String, Object>[] messages = new Map[refs.size()];
-         int i = 0;
-         for (MessageReference ref : refs)
+         List<Map<String, Object>> messages = new ArrayList<Map<String,Object>>();
+         Iterator<MessageReference> iterator = queue.iterator();
+         while (iterator.hasNext())
          {
-            Message message = ref.getMessage();
-            messages[i++] = message.toMap();
+            MessageReference ref = (MessageReference)iterator.next();
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               Message message = ref.getMessage();
+               messages.add(message.toMap());
+            }
          }
-         return messages;
+         return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
       }
       catch (HornetQException e)
       {
@@ -398,8 +403,24 @@
       try
       {
          Filter filter = FilterImpl.createFilter(filterStr);
-         List<MessageReference> refs = queue.list(filter);
-         return refs.size();
+         if (filter == null)
+         {
+            return getMessageCount();
+         }
+         else
+         {
+            Iterator<MessageReference> iterator = queue.iterator();
+            int count = 0;
+            while (iterator.hasNext())
+            {
+               MessageReference ref = (MessageReference)iterator.next();
+               if (filter.match(ref.getMessage()))
+               {
+                  count ++;
+               }
+            }
+            return count;
+         }
       }
       finally
       {
@@ -523,14 +544,7 @@
       {
          Filter filter = FilterImpl.createFilter(filterStr);
 
-         List<MessageReference> refs = queue.list(filter);
-
-         for (MessageReference ref : refs)
-         {
-            sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
-         }
-
-         return refs.size();
+         return queue.sendMessagesToDeadLetterAddress(filter);
       }
       finally
       {
@@ -543,10 +557,7 @@
       clearIO();
       try
       {
-
-         boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
-
-         return retValue;
+         return queue.sendMessageToDeadLetterAddress(messageID);
       }
       finally
       {
@@ -559,16 +570,14 @@
       clearIO();
       try
       {
-         Filter filter = FilterImpl.createFilter(filterStr);
-
-         List<MessageReference> refs = queue.list(filter);
-
-         for (MessageReference ref : refs)
+         if (newPriority < 0 || newPriority > 9)
          {
-            changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+            throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
+                                               ". It must be between 0 and 9 (both included)");
          }
+         Filter filter = FilterImpl.createFilter(filterStr);
 
-         return refs.size();
+         return queue.changeReferencesPriority(filter, (byte)newPriority);
       }
       finally
       {

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2010-01-11 12:15:25 UTC (rev 8786)
@@ -65,8 +65,6 @@
 
    void deliverAsync(Executor executor);
 
-   List<MessageReference> list(Filter filter);
-
    int getMessageCount();
 
    int getDeliveringCount();
@@ -108,8 +106,12 @@
 
    boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
 
+   int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
+
    boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
 
+   int changeReferencesPriority(Filter filter, byte newPriority) throws Exception;
+
    boolean moveReference(long messageID, SimpleString toAddress) throws Exception;
 
    int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
@@ -151,4 +153,5 @@
     */
    boolean isPaused();
 
+
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-11 12:15:25 UTC (rev 8786)
@@ -407,28 +407,6 @@
       };
    }
 
-   public synchronized List<MessageReference> list(final Filter filter)
-   {
-      if (filter == null)
-      {
-         return new ArrayList<MessageReference>(messageReferences.getAll());
-      }
-      else
-      {
-         ArrayList<MessageReference> list = new ArrayList<MessageReference>();
-
-         for (MessageReference ref : messageReferences.getAll())
-         {
-            if (filter.match(ref.getMessage()))
-            {
-               list.add(ref);
-            }
-         }
-
-         return list;
-      }
-   }
-
    public MessageReference removeReferenceWithID(final long id) throws Exception
    {
       Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -759,7 +737,26 @@
       }
       return false;
    }
+   
+   public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+   {
+      int count = 0;
+      Iterator<MessageReference> iter = messageReferences.iterator();
 
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+         if (filter == null || filter.match(ref.getMessage()))
+         {
+            deliveringCount.incrementAndGet();
+            sendToDeadLetterAddress(ref);
+            iter.remove();
+            count ++;
+         }
+      }
+      return count;
+   }
+
    public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
    {
       Iterator<MessageReference> iter = messageReferences.iterator();
@@ -829,7 +826,26 @@
 
       return false;
    }
+   
+   public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
+   {
+      Iterator<MessageReference> iter = messageReferences.iterator();
 
+      int count = 0;
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+         if (filter == null || filter.match(ref.getMessage()))
+         {
+            count ++;
+            iter.remove();
+            ref.getMessage().setPriority(newPriority);
+            addLast(ref);
+         }
+      }
+      return count;
+   }
+
    // Public
    // -----------------------------------------------------------------------------
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-01-11 12:15:25 UTC (rev 8786)
@@ -125,6 +125,15 @@
       // TODO Auto-generated method stub
       return false;
    }
+   
+/* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#changeReferencesPriority(org.hornetq.core.filter.Filter, byte)
+    */
+   public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
@@ -457,6 +466,15 @@
       // TODO Auto-generated method stub
       return false;
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#sendMessagesToDeadLetterAddress(org.hornetq.core.filter.Filter)
+    */
+   public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2010-01-11 12:15:25 UTC (rev 8786)
@@ -14,6 +14,7 @@
 package org.hornetq.tests.unit.core.server.impl;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -915,7 +916,7 @@
       FakeConsumer consumer = new FakeConsumer(filter);
    }
 
-   public void testList()
+   public void testIterator()
    {
       QueueImpl queue = new QueueImpl(1,
                                   QueueImplTest.address1,
@@ -943,54 +944,13 @@
 
       Assert.assertEquals(numMessages, queue.getMessageCount());
 
-      List<MessageReference> list = queue.list(null);
-
-      assertRefListsIdenticalRefs(refs, list);
-   }
-
-   public void testListWithFilter()
-   {
-      QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
-
-      final int numMessages = 20;
-
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-
-      for (int i = 0; i < numMessages; i++)
+      Iterator<MessageReference> iterator = queue.iterator();      
+      List<MessageReference> list = new ArrayList<MessageReference>();
+      while (iterator.hasNext())
       {
-         MessageReference ref = generateReference(queue, i);
-
-         if (i % 2 == 0)
-         {
-            ref.getMessage().putStringProperty(new SimpleString("god"), new SimpleString("dog"));
-         }
-
-         queue.addLast(ref);
-
-         refs.add(ref);
+         list.add(iterator.next());
       }
-
-      Assert.assertEquals(numMessages, queue.getMessageCount());
-
-      Filter filter = new FakeFilter("god", "dog");
-
-      List<MessageReference> list = queue.list(filter);
-
-      Assert.assertEquals(numMessages / 2, list.size());
-
-      for (int i = 0; i < numMessages; i += 2)
-      {
-         Assert.assertEquals(refs.get(i), list.get(i / 2));
-      }
+      assertRefListsIdenticalRefs(refs, list);
    }
 
    public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception



More information about the hornetq-commits mailing list