[jboss-cvs] JBoss Messaging SVN: r5207 - in trunk/src/main/org/jboss/messaging/core/server: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 29 12:54:51 EDT 2008


Author: ataylor
Date: 2008-10-29 12:54:50 -0400 (Wed, 29 Oct 2008)
New Revision: 5207

Added:
   trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1323 - abstracted out the scheduling functionality

Added: trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java	2008-10-29 16:54:50 UTC (rev 5207)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface ScheduledDeliveryHandler
+{
+   boolean checkAndSchedule(MessageReference ref, boolean backup);
+
+   void reSchedule();
+
+   int getScheduledCount();
+
+   List<MessageReference> getScheduledMessages();
+
+   List<MessageReference> cancel();
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-29 16:52:07 UTC (rev 5206)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-29 16:54:50 UTC (rev 5207)
@@ -14,15 +14,11 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,6 +36,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -74,13 +71,11 @@
 
    private final boolean temporary;
 
-   private final ScheduledExecutorService scheduledExecutor;
-
    private final PostOffice postOffice;
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
-   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+   private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
    private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
 
@@ -125,11 +120,11 @@
 
       this.temporary = temporary;
 
-      this.scheduledExecutor = scheduledExecutor;
-
       this.postOffice = postOffice;
 
       direct = true;
+
+      scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
    }
 
    // Queue implementation
@@ -177,7 +172,7 @@
          
          ServerMessage msg = ref.getMessage();
 
-         if (!checkAndSchedule(ref))
+         if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
          {
             messageReferences.addFirst(ref, msg.getPriority());
          }
@@ -309,17 +304,12 @@
 
    public synchronized int getScheduledCount()
    {
-      return scheduledRunnables.size();
+      return scheduledDeliveryHandler.getScheduledCount();
    }
 
    public synchronized List<MessageReference> getScheduledMessages()
    {
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-      for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-      {
-         refs.add(runnable.getReference());
-      }
-      return refs;
+      return scheduledDeliveryHandler.getScheduledMessages();
    }
 
    public int getDeliveringCount()
@@ -391,18 +381,12 @@
          iter.remove();
       }
 
-      synchronized (scheduledRunnables)
+      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+      for (MessageReference messageReference : cancelled)
       {
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-         {
-            runnable.cancel();
+          deliveringCount.incrementAndGet();
 
-            deliveringCount.incrementAndGet();
-
-            tx.addAcknowledgement(runnable.getReference());
-         }
-
-         scheduledRunnables.clear();
+          tx.addAcknowledgement(messageReference);
       }
 
       tx.commit();
@@ -557,10 +541,7 @@
 
          backup = false;
 
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-         {
-            scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
-         }
+         scheduledDeliveryHandler.reSchedule();
 
          return true;
       }
@@ -679,7 +660,7 @@
          sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
       }
 
-      if (checkAndSchedule(ref))
+      if (scheduledDeliveryHandler.checkAndSchedule(ref, backup))
       {
          return HandleStatus.HANDLED;
       }
@@ -741,42 +722,6 @@
       return HandleStatus.HANDLED;
    }
 
-   private boolean checkAndSchedule(final MessageReference ref)
-   {
-      long deliveryTime = ref.getScheduledDeliveryTime();
-
-      if (deliveryTime != 0 && scheduledExecutor != null)
-      {
-         if (trace)
-         {
-            log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
-         }
-
-         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
-         scheduledRunnables.add(runnable);
-
-         if (!backup)
-         {
-            scheduleDelivery(runnable, deliveryTime);
-         }
-
-         return true;
-      }
-      return false;
-   }
-
-   private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
-   {
-      long now = System.currentTimeMillis();
-
-      long delay = deliveryTime - now;
-
-      Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-
-      runnable.setFuture(future);
-   }
-
    private HandleStatus deliver(final MessageReference reference)
    {
       HandleStatus status = distributionPolicy.distribute(reference);
@@ -811,78 +756,4 @@
       }
    }
 
-   private class ScheduledDeliveryRunnable implements Runnable
-   {
-      private final MessageReference ref;
-
-      private volatile Future<?> future;
-
-      private boolean cancelled;
-
-      public ScheduledDeliveryRunnable(final MessageReference ref)
-      {
-         this.ref = ref;
-      }
-
-      public synchronized void setFuture(final Future<?> future)
-      {
-         if (cancelled)
-         {
-            future.cancel(false);
-         }
-         else
-         {
-            this.future = future;
-         }
-      }
-
-      public synchronized void cancel()
-      {
-         if (future != null)
-         {
-            future.cancel(false);
-         }
-
-         cancelled = true;
-      }
-
-      public MessageReference getReference()
-      {
-         return ref;
-      }
-
-      public void run()
-      {
-         if (trace)
-         {
-            log.trace("Scheduled delivery timeout " + ref);
-         }
-
-         synchronized (scheduledRunnables)
-         {
-            boolean removed = scheduledRunnables.remove(this);
-
-            if (!removed)
-            {
-               log.warn("Failed to remove timeout " + this);
-
-               return;
-            }
-         }
-
-         ref.setScheduledDeliveryTime(0);
-
-         HandleStatus status = deliver(ref);
-
-         if (HandleStatus.HANDLED != status)
-         {
-            // Add back to the front of the queue
-
-            // TODO - need to replicate this so backup node also adds back to
-            // front of queue
-
-            addFirst(ref);
-         }
-      }
-   }
 }

Added: trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	2008-10-29 16:54:50 UTC (rev 5207)
@@ -0,0 +1,200 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Handles scheduling deliveries to a queue at the correct time.
+ * 
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
+{
+   private static final Logger log = Logger.getLogger(ScheduledDeliveryHandlerImpl.class);
+
+   private static final boolean trace = log.isTraceEnabled();
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+
+   public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
+   {
+      this.scheduledExecutor = scheduledExecutor;
+   }
+
+   public boolean checkAndSchedule(final MessageReference ref, final boolean backup)
+   {
+      long deliveryTime = ref.getScheduledDeliveryTime();
+
+      if (deliveryTime != 0 && scheduledExecutor != null)
+      {
+         if (trace)
+         {
+            log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+         }
+
+         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+         scheduledRunnables.add(runnable);
+
+         if (!backup)
+         {
+            scheduleDelivery(runnable, deliveryTime);
+         }
+
+         return true;
+      }
+      return false;
+   }
+
+   public void reSchedule()
+   {
+      for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+      {
+         scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+      }
+   }
+
+   public int getScheduledCount()
+   {
+      return scheduledRunnables.size();
+   }
+
+   public List<MessageReference> getScheduledMessages()
+   {
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables)
+         {
+            refs.add(scheduledRunnable.getReference());
+         }
+      }
+      return refs;
+   }
+
+   public List<MessageReference> cancel()
+   {
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+         {
+            runnable.cancel();
+            refs.add(runnable.getReference());
+         }
+
+         scheduledRunnables.clear();
+      }
+      return refs;
+   }
+
+   private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
+   {
+      long now = System.currentTimeMillis();
+
+      long delay = deliveryTime - now;
+
+      Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+      runnable.setFuture(future);
+   }
+
+   private class ScheduledDeliveryRunnable implements Runnable
+   {
+      private final MessageReference ref;
+
+      private volatile Future<?> future;
+
+      private boolean cancelled;
+
+      public ScheduledDeliveryRunnable(final MessageReference ref)
+      {
+         this.ref = ref;
+      }
+
+      public synchronized void setFuture(final Future<?> future)
+      {
+         if (cancelled)
+         {
+            future.cancel(false);
+         }
+         else
+         {
+            this.future = future;
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         if (future != null)
+         {
+            future.cancel(false);
+         }
+
+         cancelled = true;
+      }
+
+      public MessageReference getReference()
+      {
+         return ref;
+      }
+
+      public void run()
+      {
+         if (trace)
+         {
+            log.trace("Scheduled delivery timeout " + ref);
+         }
+
+         synchronized (scheduledRunnables)
+         {
+            boolean removed = scheduledRunnables.remove(this);
+
+            if (!removed)
+            {
+               log.warn("Failed to remove timeout " + this);
+
+               return;
+            }
+         }
+
+         ref.setScheduledDeliveryTime(0);
+         // TODO - need to replicate this so backup node also adds back to
+         // front of queue
+         ref.getQueue().addFirst(ref);
+
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list