[hornetq-commits] JBoss hornetq SVN: r8785 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 11 07:01:07 EST 2010


Author: ataylor
Date: 2010-01-11 07:01:06 -0500 (Mon, 11 Jan 2010)
New Revision: 8785

Modified:
   trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-266 - now only cancel scheduled messages that match the filter.

Modified: trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java	2010-01-11 11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java	2010-01-11 12:01:06 UTC (rev 8785)
@@ -12,6 +12,8 @@
  */
 package org.hornetq.core.server;
 
+import org.hornetq.core.filter.Filter;
+
 import java.util.List;
 
 /**
@@ -21,13 +23,11 @@
 {
    boolean checkAndSchedule(MessageReference ref);
 
-   void reSchedule();
-
    int getScheduledCount();
 
    List<MessageReference> getScheduledReferences();
 
-   List<MessageReference> cancel();
+   List<MessageReference> cancel(Filter filter);
 
    MessageReference removeReferenceWithID(long id);
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-11 11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-11 12:01:06 UTC (rev 8785)
@@ -54,7 +54,6 @@
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConcurrentSet;
 
 /**
  * Implementation of a Queue
@@ -646,15 +645,12 @@
          }
       }
 
-      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
       for (MessageReference messageReference : cancelled)
       {
-         if (filter == null || filter.match(messageReference.getMessage()))
-         {
-            deliveringCount.incrementAndGet();
-            acknowledge(tx, messageReference);
-            count++;
-         }
+         deliveringCount.incrementAndGet();
+         acknowledge(tx, messageReference);
+         count++;
       }
 
       tx.commit();
@@ -801,16 +797,13 @@
          }
       }
 
-      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
       for (MessageReference ref : cancelled)
       {
-         if (filter == null || filter.match(ref.getMessage()))
-         {
-            deliveringCount.incrementAndGet();
-            move(toAddress, tx, ref, false);
-            acknowledge(tx, ref);
-            count++;
-         }
+         deliveringCount.incrementAndGet();
+         move(toAddress, tx, ref, false);
+         acknowledge(tx, ref);
+         count++;
       }
 
       tx.commit();

Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2010-01-11 11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2010-01-11 12:01:06 UTC (rev 8785)
@@ -20,6 +20,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ScheduledDeliveryHandler;
@@ -42,8 +43,6 @@
 
    private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
 
-   private boolean rescheduled;
-
    public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
    {
       this.scheduledExecutor = scheduledExecutor;
@@ -74,22 +73,6 @@
       return false;
    }
 
-   public void reSchedule()
-   {
-      synchronized (scheduledRunnables)
-      {
-         if (!rescheduled)
-         {
-            for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
-            {
-               scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
-            }
-
-            rescheduled = true;
-         }
-      }
-   }
-
    public int getScheduledCount()
    {
       return scheduledRunnables.size();
@@ -109,20 +92,26 @@
       return refs;
    }
 
-   public List<MessageReference> cancel()
+   public List<MessageReference> cancel(final Filter filter)
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
 
       synchronized (scheduledRunnables)
       {
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
+         Map<Long, ScheduledDeliveryRunnable> copy = new LinkedHashMap<Long, ScheduledDeliveryRunnable>(scheduledRunnables);
+         for (ScheduledDeliveryRunnable runnable : copy.values())
          {
-            runnable.cancel();
+            if (filter == null || filter.match(runnable.getReference().getMessage()))
+            {
+               runnable.cancel();
 
-            refs.add(runnable.getReference());
+               refs.add(runnable.getReference());
+            }
          }
-
-         scheduledRunnables.clear();
+         for (MessageReference ref : refs)
+         {
+            scheduledRunnables.remove(ref.getMessage().getMessageID());
+         }
       }
       return refs;
    }



More information about the hornetq-commits mailing list