[hornetq-commits] JBoss hornetq SVN: r10188 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 8 16:00:38 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-08 16:00:38 -0500 (Tue, 08 Feb 2011)
New Revision: 10188

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
Fixing Iterators leakage on QueueImpl - https://issues.jboss.org/browse/JBPAPP-5870

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-02-08 21:00:38 UTC (rev 10188)
@@ -40,6 +40,7 @@
 import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.LinkedListIterator;
 import org.hornetq.utils.json.JSONArray;
 import org.hornetq.utils.json.JSONObject;
 
@@ -400,17 +401,24 @@
          Filter filter = FilterImpl.createFilter(filterStr);
          List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
          queue.blockOnExecutorFuture();
-         Iterator<MessageReference> iterator = queue.iterator();
-         while (iterator.hasNext())
+         LinkedListIterator<MessageReference> iterator = queue.iterator();
+         try
          {
-            MessageReference ref = (MessageReference)iterator.next();
-            if (filter == null || filter.match(ref.getMessage()))
+            while (iterator.hasNext())
             {
-               Message message = ref.getMessage();
-               messages.add(message.toMap());
+               MessageReference ref = (MessageReference)iterator.next();
+               if (filter == null || filter.match(ref.getMessage()))
+               {
+                  Message message = ref.getMessage();
+                  messages.add(message.toMap());
+               }
             }
+            return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
          }
-         return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
+         finally
+         {
+            iterator.close();
+         }
       }
       catch (HornetQException e)
       {
@@ -451,17 +459,24 @@
          }
          else
          {
-            Iterator<MessageReference> iterator = queue.iterator();
-            int count = 0;
-            while (iterator.hasNext())
+            LinkedListIterator<MessageReference> iterator = queue.iterator();
+            try
             {
-               MessageReference ref = (MessageReference)iterator.next();
-               if (filter.match(ref.getMessage()))
+               int count = 0;
+               while (iterator.hasNext())
                {
-                  count++;
+                  MessageReference ref = (MessageReference)iterator.next();
+                  if (filter.match(ref.getMessage()))
+                  {
+                     count++;
+                  }
                }
+               return count;
             }
-            return count;
+            finally
+            {
+               iterator.close();
+            }
          }
       }
       finally
@@ -574,7 +589,6 @@
       return moveMessages(filterStr, otherQueueName, false);
    }
 
-
    public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
    {
       checkStarted();
@@ -827,7 +841,7 @@
                obj.put("sessionID", serverConsumer.getSessionID());
                obj.put("browseOnly", serverConsumer.isBrowseOnly());
                obj.put("creationTime", serverConsumer.getCreationTime());
-               
+
                jsonArray.put(obj);
             }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-08 21:00:38 UTC (rev 10188)
@@ -14,7 +14,6 @@
 package org.hornetq.core.server;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -22,6 +21,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
 
 /**
  * 
@@ -131,7 +131,7 @@
 
    boolean checkDLQ(MessageReference ref) throws Exception;
 
-   Iterator<MessageReference> iterator();
+   LinkedListIterator<MessageReference> iterator();
 
    void setExpiryAddress(SimpleString expiryAddress);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-08 21:00:38 UTC (rev 10188)
@@ -93,18 +93,18 @@
    private final boolean temporary;
 
    private final PostOffice postOffice;
-   
+
    private final PageSubscription pageSubscription;
-   
+
    private final LinkedListIterator<PagedReference> pageIterator;
 
    private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
-   
+
    // The quantity of pagedReferences on messageREferences priority list
    private final AtomicInteger pagedReferences = new AtomicInteger(0);
-   
+
    // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
    private final AtomicInteger queueMemorySize = new AtomicInteger(0);
 
@@ -157,7 +157,7 @@
    private volatile boolean checkDirect;
 
    private volatile boolean directDeliver = true;
-   
+
    public QueueImpl(final long id,
                     final SimpleString address,
                     final SimpleString name,
@@ -171,20 +171,19 @@
                     final Executor executor)
    {
       this(id,
-          address,
-          name,
-          filter,
-          null,
-          durable,
-          temporary,
-          scheduledExecutor,
-          postOffice,
-          storageManager,
-          addressSettingsRepository,
-          executor);
+           address,
+           name,
+           filter,
+           null,
+           durable,
+           temporary,
+           scheduledExecutor,
+           postOffice,
+           storageManager,
+           addressSettingsRepository,
+           executor);
    }
 
-
    public QueueImpl(final long id,
                     final SimpleString address,
                     final SimpleString name,
@@ -205,7 +204,7 @@
       this.name = name;
 
       this.filter = filter;
-      
+
       this.pageSubscription = pageSubscription;
 
       this.durable = durable;
@@ -230,7 +229,7 @@
       {
          expiryAddress = null;
       }
-      
+
       if (pageSubscription != null)
       {
          pageSubscription.setQueue(this);
@@ -294,7 +293,7 @@
    {
       return name;
    }
-   
+
    public SimpleString getAddress()
    {
       return address;
@@ -309,7 +308,7 @@
    {
       return pageSubscription;
    }
-   
+
    public Filter getFilter()
    {
       return filter;
@@ -328,7 +327,6 @@
       directDeliver = false;
    }
 
-
    public synchronized void reload(final MessageReference ref)
    {
       queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -364,7 +362,11 @@
       // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
       if (checkDirect)
       {
-         if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging())
+         if (direct && !directDeliver &&
+             concurrentQueue.isEmpty() &&
+             messageReferences.isEmpty() &&
+             !pageIterator.hasNext() &&
+             !pageSubscription.isPaging())
          {
             // We must block on the executor to ensure any async deliveries have completed or we might get out of order
             // deliveries
@@ -380,7 +382,7 @@
       {
          return;
       }
-      
+
       queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
 
       concurrentQueue.add(ref);
@@ -614,56 +616,72 @@
       return false;
    }
 
-   public Iterator<MessageReference> iterator()
+   public LinkedListIterator<MessageReference> iterator()
    {
       return new SynchronizedIterator(messageReferences.iterator());
    }
 
    public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
    {
-      Iterator<MessageReference> iterator = iterator();
+      LinkedListIterator<MessageReference> iterator = iterator();
 
-      MessageReference removed = null;
-
-      while (iterator.hasNext())
+      try
       {
-         MessageReference ref = iterator.next();
 
-         if (ref.getMessage().getMessageID() == id)
+         MessageReference removed = null;
+
+         while (iterator.hasNext())
          {
-            iterator.remove();
-            refRemoved(ref);
+            MessageReference ref = iterator.next();
 
-            removed = ref;
+            if (ref.getMessage().getMessageID() == id)
+            {
+               iterator.remove();
+               refRemoved(ref);
 
-            break;
+               removed = ref;
+
+               break;
+            }
          }
+
+         if (removed == null)
+         {
+            // Look in scheduled deliveries
+            removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+         }
+
+         return removed;
       }
-
-      if (removed == null)
+      finally
       {
-         // Look in scheduled deliveries
-         removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+         iterator.close();
       }
-
-      return removed;
    }
 
    public synchronized MessageReference getReference(final long id)
    {
-      Iterator<MessageReference> iterator = iterator();
+      LinkedListIterator<MessageReference> iterator = iterator();
 
-      while (iterator.hasNext())
+      try
       {
-         MessageReference ref = iterator.next();
 
-         if (ref.getMessage().getMessageID() == id)
+         while (iterator.hasNext())
          {
-            return ref;
+            MessageReference ref = iterator.next();
+
+            if (ref.getMessage().getMessageID() == id)
+            {
+               return ref;
+            }
          }
+
+         return null;
       }
-
-      return null;
+      finally
+      {
+         iterator.close();
+      }
    }
 
    public long getMessageCount()
@@ -674,8 +692,11 @@
       {
          if (pageSubscription != null)
          {
-            // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
-            return messageReferences.size() + getScheduledCount()  + deliveringCount.get() +  pageSubscription.getMessageCount();
+            // messageReferences will have depaged messages which we need to discount from the counter as they are
+            // counted on the pageSubscription as well
+            return messageReferences.size() + getScheduledCount() +
+                   deliveringCount.get() +
+                   pageSubscription.getMessageCount();
          }
          else
          {
@@ -709,9 +730,9 @@
       else
       {
          ServerMessage message = ref.getMessage();
-   
+
          boolean durableRef = message.isDurable() && durable;
-   
+
          if (durableRef)
          {
             storageManager.storeAcknowledge(id, message.getMessageID());
@@ -726,22 +747,22 @@
       if (ref.isPaged())
       {
          pageSubscription.ackTx(tx, (PagedReference)ref);
-         
+
          getRefsOperation(tx).addAck(ref);
       }
       else
       {
          ServerMessage message = ref.getMessage();
-   
+
          boolean durableRef = message.isDurable() && durable;
-   
+
          if (durableRef)
          {
             storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
-   
+
             tx.setContainsPersistent();
          }
-   
+
          getRefsOperation(tx).addAck(ref);
       }
    }
@@ -756,8 +777,8 @@
       }
 
       getRefsOperation(tx).addAck(ref);
-      
-      //https://issues.jboss.org/browse/HORNETQ-609
+
+      // https://issues.jboss.org/browse/HORNETQ-609
       deliveringCount.incrementAndGet();
    }
 
@@ -848,33 +869,40 @@
 
       Transaction tx = new TransactionImpl(storageManager);
 
-      Iterator<MessageReference> iter = iterator();
-
-      while (iter.hasNext())
+      LinkedListIterator<MessageReference> iter = iterator();
+      try
       {
-         MessageReference ref = iter.next();
 
-         if (filter == null || filter.match(ref.getMessage()))
+         while (iter.hasNext())
          {
+            MessageReference ref = iter.next();
+
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               deliveringCount.incrementAndGet();
+               acknowledge(tx, ref);
+               iter.remove();
+               refRemoved(ref);
+               count++;
+            }
+         }
+
+         List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+         for (MessageReference messageReference : cancelled)
+         {
             deliveringCount.incrementAndGet();
-            acknowledge(tx, ref);
-            iter.remove();
-            refRemoved(ref);
+            acknowledge(tx, messageReference);
             count++;
          }
+
+         tx.commit();
+
+         return count;
       }
-
-      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
-      for (MessageReference messageReference : cancelled)
+      finally
       {
-         deliveringCount.incrementAndGet();
-         acknowledge(tx, messageReference);
-         count++;
+         iter.close();
       }
-
-      tx.commit();
-
-      return count;
    }
 
    public synchronized boolean deleteReference(final long messageID) throws Exception
@@ -883,44 +911,58 @@
 
       Transaction tx = new TransactionImpl(storageManager);
 
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
+      try
+      {
 
-      while (iter.hasNext())
-      {
-         MessageReference ref = iter.next();
-         if (ref.getMessage().getMessageID() == messageID)
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            acknowledge(tx, ref);
-            iter.remove();
-            refRemoved(ref);
-            deleted = true;
-            break;
+            MessageReference ref = iter.next();
+            if (ref.getMessage().getMessageID() == messageID)
+            {
+               deliveringCount.incrementAndGet();
+               acknowledge(tx, ref);
+               iter.remove();
+               refRemoved(ref);
+               deleted = true;
+               break;
+            }
          }
-      }
 
-      tx.commit();
+         tx.commit();
 
-      return deleted;
+         return deleted;
+      }
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized boolean expireReference(final long messageID) throws Exception
    {
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
+      try
+      {
 
-      while (iter.hasNext())
-      {
-         MessageReference ref = iter.next();
-         if (ref.getMessage().getMessageID() == messageID)
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            expire(ref);
-            iter.remove();
-            refRemoved(ref);
-            return true;
+            MessageReference ref = iter.next();
+            if (ref.getMessage().getMessageID() == messageID)
+            {
+               deliveringCount.incrementAndGet();
+               expire(ref);
+               iter.remove();
+               refRemoved(ref);
+               return true;
+            }
          }
+         return false;
       }
-      return false;
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized int expireReferences(final Filter filter) throws Exception
@@ -928,112 +970,150 @@
       Transaction tx = new TransactionImpl(storageManager);
 
       int count = 0;
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (filter == null || filter.match(ref.getMessage()))
+
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            expire(tx, ref);
-            iter.remove();
-            refRemoved(ref);
-            count++;
+            MessageReference ref = iter.next();
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               deliveringCount.incrementAndGet();
+               expire(tx, ref);
+               iter.remove();
+               refRemoved(ref);
+               count++;
+            }
          }
-      }
 
-      tx.commit();
+         tx.commit();
 
-      return count;
+         return count;
+      }
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized void expireReferences() throws Exception
    {
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (ref.getMessage().isExpired())
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            expire(ref);
-            iter.remove();
-            refRemoved(ref);
+            MessageReference ref = iter.next();
+            if (ref.getMessage().isExpired())
+            {
+               deliveringCount.incrementAndGet();
+               expire(ref);
+               iter.remove();
+               refRemoved(ref);
+            }
          }
       }
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
    {
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (ref.getMessage().getMessageID() == messageID)
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            sendToDeadLetterAddress(ref);
-            iter.remove();
-            refRemoved(ref);
-            return true;
+            MessageReference ref = iter.next();
+            if (ref.getMessage().getMessageID() == messageID)
+            {
+               deliveringCount.incrementAndGet();
+               sendToDeadLetterAddress(ref);
+               iter.remove();
+               refRemoved(ref);
+               return true;
+            }
          }
+         return false;
       }
-      return false;
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
    {
       int count = 0;
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (filter == null || filter.match(ref.getMessage()))
+         while (iter.hasNext())
          {
-            deliveringCount.incrementAndGet();
-            sendToDeadLetterAddress(ref);
-            iter.remove();
-            refRemoved(ref);
-            count++;
+            MessageReference ref = iter.next();
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               deliveringCount.incrementAndGet();
+               sendToDeadLetterAddress(ref);
+               iter.remove();
+               refRemoved(ref);
+               count++;
+            }
          }
+         return count;
       }
-      return count;
+      finally
+      {
+         iter.close();
+      }
    }
 
    public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
    {
       return moveReference(messageID, toAddress, false);
    }
-   
-   public synchronized boolean moveReference(final long messageID, final SimpleString toAddress, final boolean rejectDuplicate) throws Exception
+
+   public synchronized boolean moveReference(final long messageID,
+                                             final SimpleString toAddress,
+                                             final boolean rejectDuplicate) throws Exception
    {
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (ref.getMessage().getMessageID() == messageID)
+         while (iter.hasNext())
          {
-            iter.remove();
-            refRemoved(ref);
-            deliveringCount.incrementAndGet();
-            try
+            MessageReference ref = iter.next();
+            if (ref.getMessage().getMessageID() == messageID)
             {
-               move(toAddress, ref);
+               iter.remove();
+               refRemoved(ref);
+               deliveringCount.incrementAndGet();
+               try
+               {
+                  move(toAddress, ref);
+               }
+               catch (Exception e)
+               {
+                  deliveringCount.decrementAndGet();
+                  throw e;
+               }
+               return true;
             }
-            catch (Exception e)
-            {
-               deliveringCount.decrementAndGet();
-               throw e;
-            }
-            return true;
          }
+         return false;
       }
-      return false;
+      finally
+      {
+         iter.close();
+      }
    }
 
    public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
@@ -1041,7 +1121,9 @@
       return moveReferences(filter, toAddress, false);
    }
 
-   public synchronized int moveReferences(final Filter filter, final SimpleString toAddress, final boolean rejectDuplicates) throws Exception
+   public synchronized int moveReferences(final Filter filter,
+                                          final SimpleString toAddress,
+                                          final boolean rejectDuplicates) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager);
 
@@ -1049,64 +1131,83 @@
 
       try
       {
-         Iterator<MessageReference> iter = iterator();
-         
-         DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
-   
-         while (iter.hasNext())
+         LinkedListIterator<MessageReference> iter = iterator();
+
+         try
          {
-            MessageReference ref = iter.next();
-            if (filter == null || filter.match(ref.getMessage()))
+
+            DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
+
+            while (iter.hasNext())
             {
-               boolean ignored = false;
-               
-               deliveringCount.incrementAndGet();
-               count++;
+               MessageReference ref = iter.next();
+               if (filter == null || filter.match(ref.getMessage()))
+               {
+                  boolean ignored = false;
 
-               if (rejectDuplicates)
-               {
-                  byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
-                  if (duplicateBytes != null)
+                  deliveringCount.incrementAndGet();
+                  count++;
+
+                  if (rejectDuplicates)
                   {
-                     if (targetDuplicateCache.contains(duplicateBytes))
+                     byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+                     if (duplicateBytes != null)
                      {
-                        log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
-                        acknowledge(tx, ref);
-                        ignored = true;
+                        if (targetDuplicateCache.contains(duplicateBytes))
+                        {
+                           log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() +
+                                    " was already set at " +
+                                    toAddress +
+                                    ". Move from " +
+                                    this.address +
+                                    " being ignored and message removed from " +
+                                    this.address);
+                           acknowledge(tx, ref);
+                           ignored = true;
+                        }
                      }
                   }
+
+                  if (!ignored)
+                  {
+                     move(toAddress, tx, ref, false, rejectDuplicates);
+                  }
+                  iter.remove();
                }
-               
-               if (!ignored)
-               {
-                  move(toAddress, tx, ref, false, rejectDuplicates);
-               }
-               iter.remove();
             }
-         }
-   
-         List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
-         for (MessageReference ref : cancelled)
-         {
-            byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
-            if (duplicateBytes != null)
+
+            List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+            for (MessageReference ref : cancelled)
             {
-               if (targetDuplicateCache.contains(duplicateBytes))
+               byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+               if (duplicateBytes != null)
                {
-                  log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored");
-                  continue;
+                  if (targetDuplicateCache.contains(duplicateBytes))
+                  {
+                     log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() +
+                              " was already set at " +
+                              toAddress +
+                              ". Move from " +
+                              this.address +
+                              " being ignored");
+                     continue;
+                  }
                }
+
+               deliveringCount.incrementAndGet();
+               count++;
+               move(toAddress, tx, ref, false, rejectDuplicates);
+               acknowledge(tx, ref);
             }
-   
-            deliveringCount.incrementAndGet();
-            count++;
-            move(toAddress, tx, ref, false, rejectDuplicates);
-            acknowledge(tx, ref);
+
+            tx.commit();
+
+            return count;
          }
-   
-         tx.commit();
-   
-         return count;
+         finally
+         {
+            iter.close();
+         }
       }
       catch (Exception e)
       {
@@ -1117,42 +1218,57 @@
 
    public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
    {
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (ref.getMessage().getMessageID() == messageID)
+
+         while (iter.hasNext())
          {
-            iter.remove();
-            refRemoved(ref);
-            ref.getMessage().setPriority(newPriority);
-            addTail(ref, false);
-            return true;
+            MessageReference ref = iter.next();
+            if (ref.getMessage().getMessageID() == messageID)
+            {
+               iter.remove();
+               refRemoved(ref);
+               ref.getMessage().setPriority(newPriority);
+               addTail(ref, false);
+               return true;
+            }
          }
+
+         return false;
       }
-
-      return false;
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
    {
-      Iterator<MessageReference> iter = iterator();
+      LinkedListIterator<MessageReference> iter = iterator();
 
-      int count = 0;
-      while (iter.hasNext())
+      try
       {
-         MessageReference ref = iter.next();
-         if (filter == null || filter.match(ref.getMessage()))
+         int count = 0;
+         while (iter.hasNext())
          {
-            count++;
-            iter.remove();
-            refRemoved(ref);
-            ref.getMessage().setPriority(newPriority);
-            addTail(ref, false);
+            MessageReference ref = iter.next();
+            if (filter == null || filter.match(ref.getMessage()))
+            {
+               count++;
+               iter.remove();
+               refRemoved(ref);
+               ref.getMessage().setPriority(newPriority);
+               addTail(ref, false);
+            }
          }
+         return count;
       }
-      return count;
+      finally
+      {
+         iter.close();
+      }
    }
 
    public synchronized void resetAllIterators()
@@ -1179,7 +1295,7 @@
    {
       return paused;
    }
-   
+
    public boolean isDirectDeliver()
    {
       return directDeliver;
@@ -1225,7 +1341,6 @@
       messageReferences.addTail(ref, ref.getMessage().getPriority());
    }
 
-
    /**
     * @param ref
     */
@@ -1236,7 +1351,6 @@
       messageReferences.addHead(ref, ref.getMessage().getPriority());
    }
 
-
    private synchronized void doPoll()
    {
       MessageReference ref = concurrentQueue.poll();
@@ -1315,7 +1429,7 @@
             if (checkExpired(ref))
             {
                holder.iter.remove();
-               
+
                refRemoved(ref);
 
                continue;
@@ -1342,7 +1456,7 @@
             if (status == HandleStatus.HANDLED)
             {
                holder.iter.remove();
-               
+
                refRemoved(ref);
 
                if (groupID != null && groupConsumer == null)
@@ -1382,14 +1496,13 @@
             pos = 0;
          }
       }
-      
+
       if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
       {
          scheduleDepage();
       }
    }
 
-
    /**
     * @param ref
     */
@@ -1401,7 +1514,7 @@
          pagedReferences.decrementAndGet();
       }
    }
-   
+
    /**
     * @param ref
     */
@@ -1413,31 +1526,29 @@
       }
    }
 
-   
    private void scheduleDepage()
    {
       executor.execute(depageRunner);
    }
-   
+
    private void depage()
    {
       if (paused || pageIterator == null || consumerList.isEmpty())
       {
          return;
       }
-      
+
       long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
-      
-      //System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+
+      // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
       while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
       {
          PagedReference reference = pageIterator.next();
          addTail(reference, false);
          pageIterator.remove();
       }
-      //System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
-      
-      
+      // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+
       deliverAsync();
    }
 
@@ -1496,7 +1607,7 @@
          return true;
       }
    }
-   
+
    /** Used on testing only **/
    public int getNumberOfReferences()
    {
@@ -1746,7 +1857,7 @@
       QueueImpl queue = (QueueImpl)ref.getQueue();
 
       queue.deliveringCount.decrementAndGet();
-      
+
       if (queue.deliveringCount.get() < 0)
       {
          new Exception("DeliveringCount became negative").printStackTrace();
@@ -1839,6 +1950,7 @@
    private final class RefsOperation implements TransactionOperation
    {
       List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+
       List<ServerMessage> pagedMessagesToPostACK = null;
 
       synchronized void addAck(final MessageReference ref)
@@ -1912,7 +2024,7 @@
                postAcknowledge(ref);
             }
          }
-         
+
          if (pagedMessagesToPostACK != null)
          {
             for (ServerMessage msg : pagedMessagesToPostACK)
@@ -1936,8 +2048,9 @@
       public void beforeRollback(final Transaction tx) throws Exception
       {
       }
-      
-      public List<MessageReference> getRelatedMessageReferences() {
+
+      public List<MessageReference> getRelatedMessageReferences()
+      {
          return refsToAck;
       }
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-02-08 21:00:38 UTC (rev 10188)
@@ -45,6 +45,7 @@
 import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.spi.core.remoting.ReadyListener;
 import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListIterator;
 import org.hornetq.utils.TypedProperties;
 
 /**
@@ -100,7 +101,7 @@
     */
    private final boolean browseOnly;
 
-   private Runnable browserDeliverer;
+   private BrowserDeliverer browserDeliverer;
 
    private final boolean strictUpdateDeliveryCount;
 
@@ -313,8 +314,12 @@
          largeMessageDeliverer.finish();
       }
 
-      if (!browseOnly)
+      if (browseOnly)
       {
+         browserDeliverer.close();
+      }
+      else
+      {
          messageQueue.removeConsumer(this);
       }
 
@@ -901,12 +906,17 @@
    {
       private MessageReference current = null;
 
-      public BrowserDeliverer(final Iterator<MessageReference> iterator)
+      public BrowserDeliverer(final LinkedListIterator<MessageReference> iterator)
       {
          this.iterator = iterator;
       }
 
-      private final Iterator<MessageReference> iterator;
+      private final LinkedListIterator<MessageReference> iterator;
+      
+      public synchronized void close()
+      {
+         iterator.close();
+      }
 
       public synchronized void run()
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java	2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java	2011-02-08 21:00:38 UTC (rev 10188)
@@ -155,11 +155,18 @@
       private LinkedListIterator<T> lastIter;
       
       private int resetCount = lastReset;
+      
+      volatile boolean closed = false;
 
       PriorityLinkedListIterator()
       {
          index = levels.length - 1;
       }
+      
+      protected void finalize()
+      {
+         close();
+      }
 
       public void repeat()
       {
@@ -173,13 +180,17 @@
 
       public void close()
       {
-         lastIter = null;
-
-         for (LinkedListIterator<T> iter : cachedIters)
+         if (!closed)
          {
-            if (iter != null)
+            closed = true;
+            lastIter = null;
+   
+            for (LinkedListIterator<T> iter : cachedIters)
             {
-               iter.close();
+               if (iter != null)
+               {
+                  iter.close();
+               }
             }
          }
       }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-08 21:00:38 UTC (rev 10188)
@@ -13,7 +13,6 @@
 
 package org.hornetq.tests.unit.core.postoffice.impl;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -27,6 +26,7 @@
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
 
 /**
  * A FakeQueue
@@ -431,7 +431,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#iterator()
     */
-   public Iterator<MessageReference> iterator()
+   public LinkedListIterator<MessageReference> iterator()
    {
       // TODO Auto-generated method stub
       return null;



More information about the hornetq-commits mailing list