Author: ataylor
Date: 2009-11-12 08:00:07 -0500 (Thu, 12 Nov 2009)
New Revision: 8270
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
fix to reset iterators when consumer is busy + some refactoring
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 05:23:09 UTC
(rev 8269)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 13:00:07 UTC
(rev 8270)
@@ -88,6 +88,8 @@
private final PriorityLinkedList<MessageReference> messageReferences = new
PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
+ private final MessageHandler globalHandler = new NullFilterMessageHandler();
+
private final ConcurrentSet<MessageReference> expiringMessageReferences = new
ConcurrentHashSet<MessageReference>();
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
@@ -128,7 +130,7 @@
private final Set<Consumer> consumers = new HashSet<Consumer>();
- private final Map<Consumer, Iterator<MessageReference>> iterators = new
HashMap<Consumer, Iterator<MessageReference>>();
+ private final Map<Consumer, MessageHandler> messageHandlers = new
HashMap<Consumer, MessageHandler>();
private final ConcurrentMap<SimpleString, Consumer> groups = new
ConcurrentHashMap<SimpleString, Consumer>();
@@ -278,7 +280,7 @@
consumers.add(consumer);
if (consumer.getFilter() != null)
{
- iterators.put(consumer, messageReferences.iterator());
+ messageHandlers.put(consumer, new
FilterMessageHandler(messageReferences.iterator()));
}
}
@@ -292,8 +294,9 @@
}
consumers.remove(consumer);
- iterators.remove(consumer);
+ messageHandlers.remove(consumer);
+
if (removed)
{
for (SimpleString groupID : groups.keySet())
@@ -1035,8 +1038,6 @@
MessageReference reference;
- Iterator<MessageReference> iterator = null;
-
// TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
@@ -1046,31 +1047,13 @@
{
consumer = distributionPolicy.getNextConsumer();
- iterator = iterators.get(consumer);
-
- if (iterator == null)
+ MessageHandler handler = messageHandlers.get(consumer);
+ if(handler == null)
{
- reference = messageReferences.peekFirst();
+ handler = globalHandler;
}
- else
- {
- if (iterator.hasNext())
- {
- reference = iterator.next();
- }
- else
- {
- reference = null;
- if (consumer.getFilter() != null)
- {
- // we have iterated on the whole queue for
- // messages which matches the consumer filter.
- // we reset its iterator in case new messages are added to the queue
- iterators.put(consumer, messageReferences.iterator());
- }
- }
- }
+ reference = handler.peek(consumer);
if (reference == null)
{
@@ -1095,14 +1078,7 @@
if (reference.getMessage().isExpired())
{
// We expire messages on the server too
- if (iterator == null)
- {
- messageReferences.removeFirst();
- }
- else
- {
- iterator.remove();
- }
+ handler.remove();
reference.handled();
@@ -1134,18 +1110,14 @@
if (status == HandleStatus.HANDLED)
{
- if (iterator == null)
- {
- messageReferences.removeFirst();
- }
- else
- {
- iterator.remove();
- }
+ handler.remove();
}
else if (status == HandleStatus.BUSY)
{
busyConsumers.add(consumer);
+
+ handler.reset();
+
if (groupID != null || busyConsumers.size() == totalConsumers)
{
// when all consumers are busy, we stop
@@ -1159,8 +1131,6 @@
{
groups.remove(consumer);
}
-
- continue;
}
}
}
@@ -1539,4 +1509,73 @@
{
return paused;
}
+
+ interface MessageHandler
+ {
+ MessageReference peek(Consumer consumer);
+
+ void remove();
+
+ void reset();
+ }
+
+ class FilterMessageHandler implements MessageHandler
+ {
+ private Iterator<MessageReference> iterator;
+
+ public FilterMessageHandler(Iterator<MessageReference> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ public MessageReference peek(Consumer consumer)
+ {
+ MessageReference reference;
+ if (iterator.hasNext())
+ {
+ reference = iterator.next();
+ }
+ else
+ {
+ reference = null;
+
+ if (consumer.getFilter() != null)
+ {
+ // we have iterated on the whole queue for
+ // messages which matches the consumer filter.
+ // we reset its iterator in case new messages are added to the queue
+ iterator = messageReferences.iterator();
+ }
+ }
+ return reference;
+ }
+
+ public void remove()
+ {
+ iterator.remove();
+ }
+
+ public void reset()
+ {
+ iterator = messageReferences.iterator();
+ }
+ }
+
+ class NullFilterMessageHandler implements MessageHandler
+ {
+ public MessageReference peek(Consumer consumer)
+ {
+ return messageReferences.peekFirst();
+ }
+
+ public void remove()
+ {
+ messageReferences.removeFirst();
+ }
+
+ public void reset()
+ {
+ //no-op
+ }
+ }
}
Show replies by date