[hornetq-commits] JBoss hornetq SVN: r10188 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Feb 8 16:00:38 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-08 16:00:38 -0500 (Tue, 08 Feb 2011)
New Revision: 10188
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
Fixing Iterators leakage on QueueImpl - https://issues.jboss.org/browse/JBPAPP-5870
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -40,6 +40,7 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -400,17 +401,24 @@
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
queue.blockOnExecutorFuture();
- Iterator<MessageReference> iterator = queue.iterator();
- while (iterator.hasNext())
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
+ try
{
- MessageReference ref = (MessageReference)iterator.next();
- if (filter == null || filter.match(ref.getMessage()))
+ while (iterator.hasNext())
{
- Message message = ref.getMessage();
- messages.add(message.toMap());
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ Message message = ref.getMessage();
+ messages.add(message.toMap());
+ }
}
+ return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
}
- return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
+ finally
+ {
+ iterator.close();
+ }
}
catch (HornetQException e)
{
@@ -451,17 +459,24 @@
}
else
{
- Iterator<MessageReference> iterator = queue.iterator();
- int count = 0;
- while (iterator.hasNext())
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
+ try
{
- MessageReference ref = (MessageReference)iterator.next();
- if (filter.match(ref.getMessage()))
+ int count = 0;
+ while (iterator.hasNext())
{
- count++;
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter.match(ref.getMessage()))
+ {
+ count++;
+ }
}
+ return count;
}
- return count;
+ finally
+ {
+ iterator.close();
+ }
}
}
finally
@@ -574,7 +589,6 @@
return moveMessages(filterStr, otherQueueName, false);
}
-
public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
{
checkStarted();
@@ -827,7 +841,7 @@
obj.put("sessionID", serverConsumer.getSessionID());
obj.put("browseOnly", serverConsumer.isBrowseOnly());
obj.put("creationTime", serverConsumer.getCreationTime());
-
+
jsonArray.put(obj);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -14,7 +14,6 @@
package org.hornetq.core.server;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
@@ -22,6 +21,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
/**
*
@@ -131,7 +131,7 @@
boolean checkDLQ(MessageReference ref) throws Exception;
- Iterator<MessageReference> iterator();
+ LinkedListIterator<MessageReference> iterator();
void setExpiryAddress(SimpleString expiryAddress);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -93,18 +93,18 @@
private final boolean temporary;
private final PostOffice postOffice;
-
+
private final PageSubscription pageSubscription;
-
+
private final LinkedListIterator<PagedReference> pageIterator;
private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
-
+
// The quantity of pagedReferences on messageREferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
-
+
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
@@ -157,7 +157,7 @@
private volatile boolean checkDirect;
private volatile boolean directDeliver = true;
-
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -171,20 +171,19 @@
final Executor executor)
{
this(id,
- address,
- name,
- filter,
- null,
- durable,
- temporary,
- scheduledExecutor,
- postOffice,
- storageManager,
- addressSettingsRepository,
- executor);
+ address,
+ name,
+ filter,
+ null,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ executor);
}
-
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -205,7 +204,7 @@
this.name = name;
this.filter = filter;
-
+
this.pageSubscription = pageSubscription;
this.durable = durable;
@@ -230,7 +229,7 @@
{
expiryAddress = null;
}
-
+
if (pageSubscription != null)
{
pageSubscription.setQueue(this);
@@ -294,7 +293,7 @@
{
return name;
}
-
+
public SimpleString getAddress()
{
return address;
@@ -309,7 +308,7 @@
{
return pageSubscription;
}
-
+
public Filter getFilter()
{
return filter;
@@ -328,7 +327,6 @@
directDeliver = false;
}
-
public synchronized void reload(final MessageReference ref)
{
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -364,7 +362,11 @@
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
if (checkDirect)
{
- if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging())
+ if (direct && !directDeliver &&
+ concurrentQueue.isEmpty() &&
+ messageReferences.isEmpty() &&
+ !pageIterator.hasNext() &&
+ !pageSubscription.isPaging())
{
// We must block on the executor to ensure any async deliveries have completed or we might get out of order
// deliveries
@@ -380,7 +382,7 @@
{
return;
}
-
+
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
concurrentQueue.add(ref);
@@ -614,56 +616,72 @@
return false;
}
- public Iterator<MessageReference> iterator()
+ public LinkedListIterator<MessageReference> iterator()
{
return new SynchronizedIterator(messageReferences.iterator());
}
public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
{
- Iterator<MessageReference> iterator = iterator();
+ LinkedListIterator<MessageReference> iterator = iterator();
- MessageReference removed = null;
-
- while (iterator.hasNext())
+ try
{
- MessageReference ref = iterator.next();
- if (ref.getMessage().getMessageID() == id)
+ MessageReference removed = null;
+
+ while (iterator.hasNext())
{
- iterator.remove();
- refRemoved(ref);
+ MessageReference ref = iterator.next();
- removed = ref;
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+ refRemoved(ref);
- break;
+ removed = ref;
+
+ break;
+ }
}
+
+ if (removed == null)
+ {
+ // Look in scheduled deliveries
+ removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+ }
+
+ return removed;
}
-
- if (removed == null)
+ finally
{
- // Look in scheduled deliveries
- removed = scheduledDeliveryHandler.removeReferenceWithID(id);
+ iterator.close();
}
-
- return removed;
}
public synchronized MessageReference getReference(final long id)
{
- Iterator<MessageReference> iterator = iterator();
+ LinkedListIterator<MessageReference> iterator = iterator();
- while (iterator.hasNext())
+ try
{
- MessageReference ref = iterator.next();
- if (ref.getMessage().getMessageID() == id)
+ while (iterator.hasNext())
{
- return ref;
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ return ref;
+ }
}
+
+ return null;
}
-
- return null;
+ finally
+ {
+ iterator.close();
+ }
}
public long getMessageCount()
@@ -674,8 +692,11 @@
{
if (pageSubscription != null)
{
- // messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
- return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+ // messageReferences will have depaged messages which we need to discount from the counter as they are
+ // counted on the pageSubscription as well
+ return messageReferences.size() + getScheduledCount() +
+ deliveringCount.get() +
+ pageSubscription.getMessageCount();
}
else
{
@@ -709,9 +730,9 @@
else
{
ServerMessage message = ref.getMessage();
-
+
boolean durableRef = message.isDurable() && durable;
-
+
if (durableRef)
{
storageManager.storeAcknowledge(id, message.getMessageID());
@@ -726,22 +747,22 @@
if (ref.isPaged())
{
pageSubscription.ackTx(tx, (PagedReference)ref);
-
+
getRefsOperation(tx).addAck(ref);
}
else
{
ServerMessage message = ref.getMessage();
-
+
boolean durableRef = message.isDurable() && durable;
-
+
if (durableRef)
{
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
-
+
tx.setContainsPersistent();
}
-
+
getRefsOperation(tx).addAck(ref);
}
}
@@ -756,8 +777,8 @@
}
getRefsOperation(tx).addAck(ref);
-
- //https://issues.jboss.org/browse/HORNETQ-609
+
+ // https://issues.jboss.org/browse/HORNETQ-609
deliveringCount.incrementAndGet();
}
@@ -848,33 +869,40 @@
Transaction tx = new TransactionImpl(storageManager);
- Iterator<MessageReference> iter = iterator();
-
- while (iter.hasNext())
+ LinkedListIterator<MessageReference> iter = iterator();
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ while (iter.hasNext())
{
+ MessageReference ref = iter.next();
+
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ refRemoved(ref);
+ count++;
+ }
+ }
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+ for (MessageReference messageReference : cancelled)
+ {
deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- refRemoved(ref);
+ acknowledge(tx, messageReference);
count++;
}
+
+ tx.commit();
+
+ return count;
}
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
- for (MessageReference messageReference : cancelled)
+ finally
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
+ iter.close();
}
-
- tx.commit();
-
- return count;
}
public synchronized boolean deleteReference(final long messageID) throws Exception
@@ -883,44 +911,58 @@
Transaction tx = new TransactionImpl(storageManager);
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
+ try
+ {
- while (iter.hasNext())
- {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- refRemoved(ref);
- deleted = true;
- break;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ refRemoved(ref);
+ deleted = true;
+ break;
+ }
}
- }
- tx.commit();
+ tx.commit();
- return deleted;
+ return deleted;
+ }
+ finally
+ {
+ iter.close();
+ }
}
public synchronized boolean expireReference(final long messageID) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
+ try
+ {
- while (iter.hasNext())
- {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
- return true;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ return true;
+ }
}
+ return false;
}
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized int expireReferences(final Filter filter) throws Exception
@@ -928,112 +970,150 @@
Transaction tx = new TransactionImpl(storageManager);
int count = 0;
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- expire(tx, ref);
- iter.remove();
- refRemoved(ref);
- count++;
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ expire(tx, ref);
+ iter.remove();
+ refRemoved(ref);
+ count++;
+ }
}
- }
- tx.commit();
+ tx.commit();
- return count;
+ return count;
+ }
+ finally
+ {
+ iter.close();
+ }
}
public synchronized void expireReferences() throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().isExpired())
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
+ MessageReference ref = iter.next();
+ if (ref.getMessage().isExpired())
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ }
}
}
+ finally
+ {
+ iter.close();
+ }
}
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- sendToDeadLetterAddress(ref);
- iter.remove();
- refRemoved(ref);
- return true;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ sendToDeadLetterAddress(ref);
+ iter.remove();
+ refRemoved(ref);
+ return true;
+ }
}
+ return false;
}
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
{
int count = 0;
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- sendToDeadLetterAddress(ref);
- iter.remove();
- refRemoved(ref);
- count++;
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ sendToDeadLetterAddress(ref);
+ iter.remove();
+ refRemoved(ref);
+ count++;
+ }
}
+ return count;
}
- return count;
+ finally
+ {
+ iter.close();
+ }
}
public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
return moveReference(messageID, toAddress, false);
}
-
- public synchronized boolean moveReference(final long messageID, final SimpleString toAddress, final boolean rejectDuplicate) throws Exception
+
+ public synchronized boolean moveReference(final long messageID,
+ final SimpleString toAddress,
+ final boolean rejectDuplicate) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+ while (iter.hasNext())
{
- iter.remove();
- refRemoved(ref);
- deliveringCount.incrementAndGet();
- try
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
{
- move(toAddress, ref);
+ iter.remove();
+ refRemoved(ref);
+ deliveringCount.incrementAndGet();
+ try
+ {
+ move(toAddress, ref);
+ }
+ catch (Exception e)
+ {
+ deliveringCount.decrementAndGet();
+ throw e;
+ }
+ return true;
}
- catch (Exception e)
- {
- deliveringCount.decrementAndGet();
- throw e;
- }
- return true;
}
+ return false;
}
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
@@ -1041,7 +1121,9 @@
return moveReferences(filter, toAddress, false);
}
- public synchronized int moveReferences(final Filter filter, final SimpleString toAddress, final boolean rejectDuplicates) throws Exception
+ public synchronized int moveReferences(final Filter filter,
+ final SimpleString toAddress,
+ final boolean rejectDuplicates) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -1049,64 +1131,83 @@
try
{
- Iterator<MessageReference> iter = iterator();
-
- DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
-
- while (iter.hasNext())
+ LinkedListIterator<MessageReference> iter = iterator();
+
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+
+ DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
+
+ while (iter.hasNext())
{
- boolean ignored = false;
-
- deliveringCount.incrementAndGet();
- count++;
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ boolean ignored = false;
- if (rejectDuplicates)
- {
- byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
- if (duplicateBytes != null)
+ deliveringCount.incrementAndGet();
+ count++;
+
+ if (rejectDuplicates)
{
- if (targetDuplicateCache.contains(duplicateBytes))
+ byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
{
- log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
- acknowledge(tx, ref);
- ignored = true;
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() +
+ " was already set at " +
+ toAddress +
+ ". Move from " +
+ this.address +
+ " being ignored and message removed from " +
+ this.address);
+ acknowledge(tx, ref);
+ ignored = true;
+ }
}
}
+
+ if (!ignored)
+ {
+ move(toAddress, tx, ref, false, rejectDuplicates);
+ }
+ iter.remove();
}
-
- if (!ignored)
- {
- move(toAddress, tx, ref, false, rejectDuplicates);
- }
- iter.remove();
}
- }
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
- for (MessageReference ref : cancelled)
- {
- byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
- if (duplicateBytes != null)
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+ for (MessageReference ref : cancelled)
{
- if (targetDuplicateCache.contains(duplicateBytes))
+ byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
{
- log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored");
- continue;
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() +
+ " was already set at " +
+ toAddress +
+ ". Move from " +
+ this.address +
+ " being ignored");
+ continue;
+ }
}
+
+ deliveringCount.incrementAndGet();
+ count++;
+ move(toAddress, tx, ref, false, rejectDuplicates);
+ acknowledge(tx, ref);
}
-
- deliveringCount.incrementAndGet();
- count++;
- move(toAddress, tx, ref, false, rejectDuplicates);
- acknowledge(tx, ref);
+
+ tx.commit();
+
+ return count;
}
-
- tx.commit();
-
- return count;
+ finally
+ {
+ iter.close();
+ }
}
catch (Exception e)
{
@@ -1117,42 +1218,57 @@
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID)
+
+ while (iter.hasNext())
{
- iter.remove();
- refRemoved(ref);
- ref.getMessage().setPriority(newPriority);
- addTail(ref, false);
- return true;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ iter.remove();
+ refRemoved(ref);
+ ref.getMessage().setPriority(newPriority);
+ addTail(ref, false);
+ return true;
+ }
}
+
+ return false;
}
-
- return false;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
{
- Iterator<MessageReference> iter = iterator();
+ LinkedListIterator<MessageReference> iter = iterator();
- int count = 0;
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ int count = 0;
+ while (iter.hasNext())
{
- count++;
- iter.remove();
- refRemoved(ref);
- ref.getMessage().setPriority(newPriority);
- addTail(ref, false);
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ count++;
+ iter.remove();
+ refRemoved(ref);
+ ref.getMessage().setPriority(newPriority);
+ addTail(ref, false);
+ }
}
+ return count;
}
- return count;
+ finally
+ {
+ iter.close();
+ }
}
public synchronized void resetAllIterators()
@@ -1179,7 +1295,7 @@
{
return paused;
}
-
+
public boolean isDirectDeliver()
{
return directDeliver;
@@ -1225,7 +1341,6 @@
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
-
/**
* @param ref
*/
@@ -1236,7 +1351,6 @@
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
-
private synchronized void doPoll()
{
MessageReference ref = concurrentQueue.poll();
@@ -1315,7 +1429,7 @@
if (checkExpired(ref))
{
holder.iter.remove();
-
+
refRemoved(ref);
continue;
@@ -1342,7 +1456,7 @@
if (status == HandleStatus.HANDLED)
{
holder.iter.remove();
-
+
refRemoved(ref);
if (groupID != null && groupConsumer == null)
@@ -1382,14 +1496,13 @@
pos = 0;
}
}
-
+
if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
{
scheduleDepage();
}
}
-
/**
* @param ref
*/
@@ -1401,7 +1514,7 @@
pagedReferences.decrementAndGet();
}
}
-
+
/**
* @param ref
*/
@@ -1413,31 +1526,29 @@
}
}
-
private void scheduleDepage()
{
executor.execute(depageRunner);
}
-
+
private void depage()
{
if (paused || pageIterator == null || consumerList.isEmpty())
{
return;
}
-
+
long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
-
- //System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+
+ // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
PagedReference reference = pageIterator.next();
addTail(reference, false);
pageIterator.remove();
}
- //System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
-
-
+ // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+
deliverAsync();
}
@@ -1496,7 +1607,7 @@
return true;
}
}
-
+
/** Used on testing only **/
public int getNumberOfReferences()
{
@@ -1746,7 +1857,7 @@
QueueImpl queue = (QueueImpl)ref.getQueue();
queue.deliveringCount.decrementAndGet();
-
+
if (queue.deliveringCount.get() < 0)
{
new Exception("DeliveringCount became negative").printStackTrace();
@@ -1839,6 +1950,7 @@
private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+
List<ServerMessage> pagedMessagesToPostACK = null;
synchronized void addAck(final MessageReference ref)
@@ -1912,7 +2024,7 @@
postAcknowledge(ref);
}
}
-
+
if (pagedMessagesToPostACK != null)
{
for (ServerMessage msg : pagedMessagesToPostACK)
@@ -1936,8 +2048,9 @@
public void beforeRollback(final Transaction tx) throws Exception
{
}
-
- public List<MessageReference> getRelatedMessageReferences() {
+
+ public List<MessageReference> getRelatedMessageReferences()
+ {
return refsToAck;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -45,6 +45,7 @@
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.TypedProperties;
/**
@@ -100,7 +101,7 @@
*/
private final boolean browseOnly;
- private Runnable browserDeliverer;
+ private BrowserDeliverer browserDeliverer;
private final boolean strictUpdateDeliveryCount;
@@ -313,8 +314,12 @@
largeMessageDeliverer.finish();
}
- if (!browseOnly)
+ if (browseOnly)
{
+ browserDeliverer.close();
+ }
+ else
+ {
messageQueue.removeConsumer(this);
}
@@ -901,12 +906,17 @@
{
private MessageReference current = null;
- public BrowserDeliverer(final Iterator<MessageReference> iterator)
+ public BrowserDeliverer(final LinkedListIterator<MessageReference> iterator)
{
this.iterator = iterator;
}
- private final Iterator<MessageReference> iterator;
+ private final LinkedListIterator<MessageReference> iterator;
+
+ public synchronized void close()
+ {
+ iterator.close();
+ }
public synchronized void run()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -155,11 +155,18 @@
private LinkedListIterator<T> lastIter;
private int resetCount = lastReset;
+
+ volatile boolean closed = false;
PriorityLinkedListIterator()
{
index = levels.length - 1;
}
+
+ protected void finalize()
+ {
+ close();
+ }
public void repeat()
{
@@ -173,13 +180,17 @@
public void close()
{
- lastIter = null;
-
- for (LinkedListIterator<T> iter : cachedIters)
+ if (!closed)
{
- if (iter != null)
+ closed = true;
+ lastIter = null;
+
+ for (LinkedListIterator<T> iter : cachedIters)
{
- iter.close();
+ if (iter != null)
+ {
+ iter.close();
+ }
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-08 18:18:54 UTC (rev 10187)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-08 21:00:38 UTC (rev 10188)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.postoffice.impl;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -27,6 +26,7 @@
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
/**
* A FakeQueue
@@ -431,7 +431,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#iterator()
*/
- public Iterator<MessageReference> iterator()
+ public LinkedListIterator<MessageReference> iterator()
{
// TODO Auto-generated method stub
return null;
More information about the hornetq-commits
mailing list