[hornetq-commits] JBoss hornetq SVN: r8270 - trunk/src/main/org/hornetq/core/server/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 12 08:00:07 EST 2009


Author: ataylor
Date: 2009-11-12 08:00:07 -0500 (Thu, 12 Nov 2009)
New Revision: 8270

Modified:
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
fix to reset iterators when consumer is busy + some refactoring

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-12 05:23:09 UTC (rev 8269)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-12 13:00:07 UTC (rev 8270)
@@ -88,6 +88,8 @@
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
+   private final MessageHandler globalHandler = new NullFilterMessageHandler();
+
    private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
 
    private final ScheduledDeliveryHandler scheduledDeliveryHandler;
@@ -128,7 +130,7 @@
 
    private final Set<Consumer> consumers = new HashSet<Consumer>();
 
-   private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
+   private final Map<Consumer, MessageHandler> messageHandlers = new HashMap<Consumer, MessageHandler>();
 
    private final ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
 
@@ -278,7 +280,7 @@
       consumers.add(consumer);
       if (consumer.getFilter() != null)
       {
-         iterators.put(consumer, messageReferences.iterator());
+         messageHandlers.put(consumer, new FilterMessageHandler(messageReferences.iterator()));
       }
    }
 
@@ -292,8 +294,9 @@
       }
 
       consumers.remove(consumer);
-      iterators.remove(consumer);
 
+      messageHandlers.remove(consumer);
+
       if (removed)
       {
          for (SimpleString groupID : groups.keySet())
@@ -1035,8 +1038,6 @@
 
       MessageReference reference;
 
-      Iterator<MessageReference> iterator = null;
-
       // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
       int totalConsumers = distributionPolicy.getConsumerCount();
       Set<Consumer> busyConsumers = new HashSet<Consumer>();
@@ -1046,31 +1047,13 @@
       {
          consumer = distributionPolicy.getNextConsumer();
 
-         iterator = iterators.get(consumer);
-
-         if (iterator == null)
+         MessageHandler handler = messageHandlers.get(consumer);
+         if(handler == null)
          {
-            reference = messageReferences.peekFirst();
+            handler = globalHandler;
          }
-         else
-         {
-            if (iterator.hasNext())
-            {
-               reference = iterator.next();
-            }
-            else
-            {
-               reference = null;
 
-               if (consumer.getFilter() != null)
-               {
-                  // we have iterated on the whole queue for
-                  // messages which matches the consumer filter.
-                  // we reset its iterator in case new messages are added to the queue
-                  iterators.put(consumer, messageReferences.iterator());
-               }
-            }
-         }
+         reference = handler.peek(consumer);
 
          if (reference == null)
          {
@@ -1095,14 +1078,7 @@
             if (reference.getMessage().isExpired())
             {
                // We expire messages on the server too
-               if (iterator == null)
-               {
-                  messageReferences.removeFirst();
-               }
-               else
-               {
-                  iterator.remove();
-               }
+               handler.remove();
 
                reference.handled();
 
@@ -1134,18 +1110,14 @@
 
          if (status == HandleStatus.HANDLED)
          {
-            if (iterator == null)
-            {
-               messageReferences.removeFirst();
-            }
-            else
-            {
-               iterator.remove();
-            }
+            handler.remove();
          }
          else if (status == HandleStatus.BUSY)
          {
             busyConsumers.add(consumer);
+
+            handler.reset();
+
             if (groupID != null || busyConsumers.size() == totalConsumers)
             {
                // when all consumers are busy, we stop
@@ -1159,8 +1131,6 @@
             {
                groups.remove(consumer);
             }
-
-            continue;
          }
       }
    }
@@ -1539,4 +1509,73 @@
    {
       return paused;
    }
+
+   interface MessageHandler
+   {
+      MessageReference peek(Consumer consumer);
+
+      void remove();
+
+      void reset();
+   }
+
+   class FilterMessageHandler implements MessageHandler
+   {
+      private Iterator<MessageReference> iterator;
+
+      public FilterMessageHandler(Iterator<MessageReference> iterator)
+      {
+         this.iterator = iterator;
+      }
+
+      public MessageReference peek(Consumer consumer)
+      {
+         MessageReference reference;
+         if (iterator.hasNext())
+         {
+            reference = iterator.next();
+         }
+         else
+         {
+            reference = null;
+
+            if (consumer.getFilter() != null)
+            {
+               // we have iterated on the whole queue for
+               // messages which matches the consumer filter.
+               // we reset its iterator in case new messages are added to the queue
+               iterator = messageReferences.iterator();
+            }
+         }
+         return reference;
+      }
+
+      public void remove()
+      {
+         iterator.remove();
+      }
+
+      public void reset()
+      {
+         iterator = messageReferences.iterator();
+      }
+   }
+
+   class NullFilterMessageHandler implements MessageHandler
+   {
+      public MessageReference peek(Consumer consumer)
+      {
+         return messageReferences.peekFirst();
+      }
+
+      public void remove()
+      {
+         messageReferences.removeFirst();
+      }
+
+      public void reset()
+      {
+         //no-op
+      }
+   }
 }



More information about the hornetq-commits mailing list