Author: ataylor
Date: 2010-01-11 07:01:06 -0500 (Mon, 11 Jan 2010)
New Revision: 8785
Modified:
trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-266 - now only cancel scheduled messages that
match the filter.
Modified: trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java 2010-01-11
11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java 2010-01-11
12:01:06 UTC (rev 8785)
@@ -12,6 +12,8 @@
*/
package org.hornetq.core.server;
+import org.hornetq.core.filter.Filter;
+
import java.util.List;
/**
@@ -21,13 +23,11 @@
{
boolean checkAndSchedule(MessageReference ref);
- void reSchedule();
-
int getScheduledCount();
List<MessageReference> getScheduledReferences();
- List<MessageReference> cancel();
+ List<MessageReference> cancel(Filter filter);
MessageReference removeReferenceWithID(long id);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 11:41:10 UTC
(rev 8784)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 12:01:06 UTC
(rev 8785)
@@ -54,7 +54,6 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConcurrentSet;
/**
* Implementation of a Queue
@@ -646,15 +645,12 @@
}
}
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference messageReference : cancelled)
{
- if (filter == null || filter.match(messageReference.getMessage()))
- {
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
- }
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
}
tx.commit();
@@ -801,16 +797,13 @@
}
}
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference ref : cancelled)
{
- if (filter == null || filter.match(ref.getMessage()))
- {
- deliveringCount.incrementAndGet();
- move(toAddress, tx, ref, false);
- acknowledge(tx, ref);
- count++;
- }
+ deliveringCount.incrementAndGet();
+ move(toAddress, tx, ref, false);
+ acknowledge(tx, ref);
+ count++;
}
tx.commit();
Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-01-11
11:41:10 UTC (rev 8784)
+++
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-01-11
12:01:06 UTC (rev 8785)
@@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ScheduledDeliveryHandler;
@@ -42,8 +43,6 @@
private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new
LinkedHashMap<Long, ScheduledDeliveryRunnable>();
- private boolean rescheduled;
-
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
{
this.scheduledExecutor = scheduledExecutor;
@@ -74,22 +73,6 @@
return false;
}
- public void reSchedule()
- {
- synchronized (scheduledRunnables)
- {
- if (!rescheduled)
- {
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
- {
- scheduleDelivery(runnable,
runnable.getReference().getScheduledDeliveryTime());
- }
-
- rescheduled = true;
- }
- }
- }
-
public int getScheduledCount()
{
return scheduledRunnables.size();
@@ -109,20 +92,26 @@
return refs;
}
- public List<MessageReference> cancel()
+ public List<MessageReference> cancel(final Filter filter)
{
List<MessageReference> refs = new ArrayList<MessageReference>();
synchronized (scheduledRunnables)
{
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
+ Map<Long, ScheduledDeliveryRunnable> copy = new LinkedHashMap<Long,
ScheduledDeliveryRunnable>(scheduledRunnables);
+ for (ScheduledDeliveryRunnable runnable : copy.values())
{
- runnable.cancel();
+ if (filter == null || filter.match(runnable.getReference().getMessage()))
+ {
+ runnable.cancel();
- refs.add(runnable.getReference());
+ refs.add(runnable.getReference());
+ }
}
-
- scheduledRunnables.clear();
+ for (MessageReference ref : refs)
+ {
+ scheduledRunnables.remove(ref.getMessage().getMessageID());
+ }
}
return refs;
}