[jboss-cvs] JBoss Messaging SVN: r5207 - in trunk/src/main/org/jboss/messaging/core/server: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 29 12:54:51 EDT 2008
Author: ataylor
Date: 2008-10-29 12:54:50 -0400 (Wed, 29 Oct 2008)
New Revision: 5207
Added:
trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1323 - abstracted out the scheduling functionality
Added: trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java 2008-10-29 16:54:50 UTC (rev 5207)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface ScheduledDeliveryHandler
+{
+ boolean checkAndSchedule(MessageReference ref, boolean backup);
+
+ void reSchedule();
+
+ int getScheduledCount();
+
+ List<MessageReference> getScheduledMessages();
+
+ List<MessageReference> cancel();
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-29 16:52:07 UTC (rev 5206)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-29 16:54:50 UTC (rev 5207)
@@ -14,15 +14,11 @@
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.Set;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,6 +36,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -74,13 +71,11 @@
private final boolean temporary;
- private final ScheduledExecutorService scheduledExecutor;
-
private final PostOffice postOffice;
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
- private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+ private final ScheduledDeliveryHandler scheduledDeliveryHandler;
private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
@@ -125,11 +120,11 @@
this.temporary = temporary;
- this.scheduledExecutor = scheduledExecutor;
-
this.postOffice = postOffice;
direct = true;
+
+ scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
}
// Queue implementation
@@ -177,7 +172,7 @@
ServerMessage msg = ref.getMessage();
- if (!checkAndSchedule(ref))
+ if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
{
messageReferences.addFirst(ref, msg.getPriority());
}
@@ -309,17 +304,12 @@
public synchronized int getScheduledCount()
{
- return scheduledRunnables.size();
+ return scheduledDeliveryHandler.getScheduledCount();
}
public synchronized List<MessageReference> getScheduledMessages()
{
- List<MessageReference> refs = new ArrayList<MessageReference>();
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
- {
- refs.add(runnable.getReference());
- }
- return refs;
+ return scheduledDeliveryHandler.getScheduledMessages();
}
public int getDeliveringCount()
@@ -391,18 +381,12 @@
iter.remove();
}
- synchronized (scheduledRunnables)
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
{
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
- {
- runnable.cancel();
+ deliveringCount.incrementAndGet();
- deliveringCount.incrementAndGet();
-
- tx.addAcknowledgement(runnable.getReference());
- }
-
- scheduledRunnables.clear();
+ tx.addAcknowledgement(messageReference);
}
tx.commit();
@@ -557,10 +541,7 @@
backup = false;
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
- {
- scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
- }
+ scheduledDeliveryHandler.reSchedule();
return true;
}
@@ -679,7 +660,7 @@
sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
}
- if (checkAndSchedule(ref))
+ if (scheduledDeliveryHandler.checkAndSchedule(ref, backup))
{
return HandleStatus.HANDLED;
}
@@ -741,42 +722,6 @@
return HandleStatus.HANDLED;
}
- private boolean checkAndSchedule(final MessageReference ref)
- {
- long deliveryTime = ref.getScheduledDeliveryTime();
-
- if (deliveryTime != 0 && scheduledExecutor != null)
- {
- if (trace)
- {
- log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
- }
-
- ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
- scheduledRunnables.add(runnable);
-
- if (!backup)
- {
- scheduleDelivery(runnable, deliveryTime);
- }
-
- return true;
- }
- return false;
- }
-
- private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
- {
- long now = System.currentTimeMillis();
-
- long delay = deliveryTime - now;
-
- Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-
- runnable.setFuture(future);
- }
-
private HandleStatus deliver(final MessageReference reference)
{
HandleStatus status = distributionPolicy.distribute(reference);
@@ -811,78 +756,4 @@
}
}
- private class ScheduledDeliveryRunnable implements Runnable
- {
- private final MessageReference ref;
-
- private volatile Future<?> future;
-
- private boolean cancelled;
-
- public ScheduledDeliveryRunnable(final MessageReference ref)
- {
- this.ref = ref;
- }
-
- public synchronized void setFuture(final Future<?> future)
- {
- if (cancelled)
- {
- future.cancel(false);
- }
- else
- {
- this.future = future;
- }
- }
-
- public synchronized void cancel()
- {
- if (future != null)
- {
- future.cancel(false);
- }
-
- cancelled = true;
- }
-
- public MessageReference getReference()
- {
- return ref;
- }
-
- public void run()
- {
- if (trace)
- {
- log.trace("Scheduled delivery timeout " + ref);
- }
-
- synchronized (scheduledRunnables)
- {
- boolean removed = scheduledRunnables.remove(this);
-
- if (!removed)
- {
- log.warn("Failed to remove timeout " + this);
-
- return;
- }
- }
-
- ref.setScheduledDeliveryTime(0);
-
- HandleStatus status = deliver(ref);
-
- if (HandleStatus.HANDLED != status)
- {
- // Add back to the front of the queue
-
- // TODO - need to replicate this so backup node also adds back to
- // front of queue
-
- addFirst(ref);
- }
- }
- }
}
Added: trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java 2008-10-29 16:54:50 UTC (rev 5207)
@@ -0,0 +1,200 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Handles scheduling deliveries to a queue at the correct time.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
+{
+ private static final Logger log = Logger.getLogger(ScheduledDeliveryHandlerImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+
+ public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
+ {
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ public boolean checkAndSchedule(final MessageReference ref, final boolean backup)
+ {
+ long deliveryTime = ref.getScheduledDeliveryTime();
+
+ if (deliveryTime != 0 && scheduledExecutor != null)
+ {
+ if (trace)
+ {
+ log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+ }
+
+ ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+ scheduledRunnables.add(runnable);
+
+ if (!backup)
+ {
+ scheduleDelivery(runnable, deliveryTime);
+ }
+
+ return true;
+ }
+ return false;
+ }
+
+ public void reSchedule()
+ {
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ {
+ scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+ }
+ }
+
+ public int getScheduledCount()
+ {
+ return scheduledRunnables.size();
+ }
+
+ public List<MessageReference> getScheduledMessages()
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ synchronized (scheduledRunnables)
+ {
+ for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables)
+ {
+ refs.add(scheduledRunnable.getReference());
+ }
+ }
+ return refs;
+ }
+
+ public List<MessageReference> cancel()
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ synchronized (scheduledRunnables)
+ {
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ {
+ runnable.cancel();
+ refs.add(runnable.getReference());
+ }
+
+ scheduledRunnables.clear();
+ }
+ return refs;
+ }
+
+ private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
+ {
+ long now = System.currentTimeMillis();
+
+ long delay = deliveryTime - now;
+
+ Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+ runnable.setFuture(future);
+ }
+
+ private class ScheduledDeliveryRunnable implements Runnable
+ {
+ private final MessageReference ref;
+
+ private volatile Future<?> future;
+
+ private boolean cancelled;
+
+ public ScheduledDeliveryRunnable(final MessageReference ref)
+ {
+ this.ref = ref;
+ }
+
+ public synchronized void setFuture(final Future<?> future)
+ {
+ if (cancelled)
+ {
+ future.cancel(false);
+ }
+ else
+ {
+ this.future = future;
+ }
+ }
+
+ public synchronized void cancel()
+ {
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ cancelled = true;
+ }
+
+ public MessageReference getReference()
+ {
+ return ref;
+ }
+
+ public void run()
+ {
+ if (trace)
+ {
+ log.trace("Scheduled delivery timeout " + ref);
+ }
+
+ synchronized (scheduledRunnables)
+ {
+ boolean removed = scheduledRunnables.remove(this);
+
+ if (!removed)
+ {
+ log.warn("Failed to remove timeout " + this);
+
+ return;
+ }
+ }
+
+ ref.setScheduledDeliveryTime(0);
+ // TODO - need to replicate this so backup node also adds back to
+ // front of queue
+ ref.getQueue().addFirst(ref);
+
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list