Author: jmesnil
Date: 2010-01-11 07:15:25 -0500 (Mon, 11 Jan 2010)
New Revision: 8786
Modified:
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-265: remove Queue#list() synchronized method
* removed Queue#list(Filter) method and use Queue#iterator() instead
* added changeReferencesPriority() and sendMessagesToDeadLetterAddress() methods to Queue
to perform operations on messages matching a filter in a single iteration
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-01-11
12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-01-11
12:15:25 UTC (rev 8786)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -359,15 +361,18 @@
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
- Map<String, Object>[] messages = new Map[refs.size()];
- int i = 0;
- for (MessageReference ref : refs)
+ List<Map<String, Object>> messages = new
ArrayList<Map<String,Object>>();
+ Iterator<MessageReference> iterator = queue.iterator();
+ while (iterator.hasNext())
{
- Message message = ref.getMessage();
- messages[i++] = message.toMap();
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ Message message = ref.getMessage();
+ messages.add(message.toMap());
+ }
}
- return messages;
+ return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
}
catch (HornetQException e)
{
@@ -398,8 +403,24 @@
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
- return refs.size();
+ if (filter == null)
+ {
+ return getMessageCount();
+ }
+ else
+ {
+ Iterator<MessageReference> iterator = queue.iterator();
+ int count = 0;
+ while (iterator.hasNext())
+ {
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter.match(ref.getMessage()))
+ {
+ count ++;
+ }
+ }
+ return count;
+ }
}
finally
{
@@ -523,14 +544,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
-
- for (MessageReference ref : refs)
- {
- sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
- }
-
- return refs.size();
+ return queue.sendMessagesToDeadLetterAddress(filter);
}
finally
{
@@ -543,10 +557,7 @@
clearIO();
try
{
-
- boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
-
- return retValue;
+ return queue.sendMessageToDeadLetterAddress(messageID);
}
finally
{
@@ -559,16 +570,14 @@
clearIO();
try
{
- Filter filter = FilterImpl.createFilter(filterStr);
-
- List<MessageReference> refs = queue.list(filter);
-
- for (MessageReference ref : refs)
+ if (newPriority < 0 || newPriority > 9)
{
- changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+ throw new IllegalArgumentException("invalid newPriority value: " +
newPriority +
+ ". It must be between 0 and 9 (both
included)");
}
+ Filter filter = FilterImpl.createFilter(filterStr);
- return refs.size();
+ return queue.changeReferencesPriority(filter, (byte)newPriority);
}
finally
{
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 12:15:25 UTC (rev 8786)
@@ -65,8 +65,6 @@
void deliverAsync(Executor executor);
- List<MessageReference> list(Filter filter);
-
int getMessageCount();
int getDeliveringCount();
@@ -108,8 +106,12 @@
boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
+ int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
+
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
+ int changeReferencesPriority(Filter filter, byte newPriority) throws Exception;
+
boolean moveReference(long messageID, SimpleString toAddress) throws Exception;
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
@@ -151,4 +153,5 @@
*/
boolean isPaused();
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 12:01:06 UTC
(rev 8785)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 12:15:25 UTC
(rev 8786)
@@ -407,28 +407,6 @@
};
}
- public synchronized List<MessageReference> list(final Filter filter)
- {
- if (filter == null)
- {
- return new ArrayList<MessageReference>(messageReferences.getAll());
- }
- else
- {
- ArrayList<MessageReference> list = new
ArrayList<MessageReference>();
-
- for (MessageReference ref : messageReferences.getAll())
- {
- if (filter.match(ref.getMessage()))
- {
- list.add(ref);
- }
- }
-
- return list;
- }
- }
-
public MessageReference removeReferenceWithID(final long id) throws Exception
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -759,7 +737,26 @@
}
return false;
}
+
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+ {
+ int count = 0;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ sendToDeadLetterAddress(ref);
+ iter.remove();
+ count ++;
+ }
+ }
+ return count;
+ }
+
public boolean moveReference(final long messageID, final SimpleString toAddress)
throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -829,7 +826,26 @@
return false;
}
+
+ public int changeReferencesPriority(final Filter filter, final byte newPriority)
throws Exception
+ {
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ int count = 0;
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ count ++;
+ iter.remove();
+ ref.getMessage().setPriority(newPriority);
+ addLast(ref);
+ }
+ }
+ return count;
+ }
+
// Public
// -----------------------------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11
12:01:06 UTC (rev 8785)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11
12:15:25 UTC (rev 8786)
@@ -125,6 +125,15 @@
// TODO Auto-generated method stub
return false;
}
+
+/* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#changeReferencesPriority(org.hornetq.core.filter.Filter,
byte)
+ */
+ public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
/* (non-Javadoc)
* @see
org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
@@ -457,6 +466,15 @@
// TODO Auto-generated method stub
return false;
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#sendMessagesToDeadLetterAddress(org.hornetq.core.filter.Filter)
+ */
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
/* (non-Javadoc)
* @see
org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11
12:01:06 UTC (rev 8785)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11
12:15:25 UTC (rev 8786)
@@ -14,6 +14,7 @@
package org.hornetq.tests.unit.core.server.impl;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -915,7 +916,7 @@
FakeConsumer consumer = new FakeConsumer(filter);
}
- public void testList()
+ public void testIterator()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
@@ -943,54 +944,13 @@
Assert.assertEquals(numMessages, queue.getMessageCount());
- List<MessageReference> list = queue.list(null);
-
- assertRefListsIdenticalRefs(refs, list);
- }
-
- public void testListWithFilter()
- {
- QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
-
- final int numMessages = 20;
-
- List<MessageReference> refs = new ArrayList<MessageReference>();
-
- for (int i = 0; i < numMessages; i++)
+ Iterator<MessageReference> iterator = queue.iterator();
+ List<MessageReference> list = new ArrayList<MessageReference>();
+ while (iterator.hasNext())
{
- MessageReference ref = generateReference(queue, i);
-
- if (i % 2 == 0)
- {
- ref.getMessage().putStringProperty(new SimpleString("god"), new
SimpleString("dog"));
- }
-
- queue.addLast(ref);
-
- refs.add(ref);
+ list.add(iterator.next());
}
-
- Assert.assertEquals(numMessages, queue.getMessageCount());
-
- Filter filter = new FakeFilter("god", "dog");
-
- List<MessageReference> list = queue.list(filter);
-
- Assert.assertEquals(numMessages / 2, list.size());
-
- for (int i = 0; i < numMessages; i += 2)
- {
- Assert.assertEquals(refs.get(i), list.get(i / 2));
- }
+ assertRefListsIdenticalRefs(refs, list);
}
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception