[jboss-cvs] JBoss Messaging SVN: r5220 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/server and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 30 11:37:25 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-30 11:37:25 -0400 (Thu, 30 Oct 2008)
New Revision: 5220
Added:
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
Modified:
branches/Branch_Chunk_CRS2/build-messaging.xml
branches/Branch_Chunk_CRS2/build.xml
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Merges from trunk (Andy's changes)
Modified: branches/Branch_Chunk_CRS2/build-messaging.xml
===================================================================
--- branches/Branch_Chunk_CRS2/build-messaging.xml 2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/build-messaging.xml 2008-10-30 15:37:25 UTC (rev 5220)
@@ -211,6 +211,20 @@
<path refid="easymock.easymock.classpath" />
</path>
+ <path id="findbugs.classpath">
+ <path refid="core.compilation.classpath"/>
+ <path refid="jms.compilation.classpath"/>
+ <path location="${build.jars.dir}/jboss-${module.name}.jar"/>
+ <path refid="junit.junit.classpath"/>
+ <path refid="jboss.profiler.jvmti.classpath"/>
+ <path refid="jboss.test14.classpath"/>
+ <path refid="jboss.jboss.retro.classpath"/>
+ <path refid="easymock.easymock.classpath" />
+ <path refid="slf4j.api.classpath"/>
+ <path refid="slf4j.log4j.classpath"/>
+ <path refid="jboss.jbossxb.classpath"/>
+ </path>
+
<path id="unit.test.execution.classpath">
<pathelement location="${test.dir}/config"/>
<pathelement location="${test.dir}/tmpfiles"/>
@@ -946,6 +960,17 @@
</java>
</target>
+ <target name="findbugs" depends="jar">
+ <taskdef name="findbugs" classname="edu.umd.cs.findbugs.anttask.FindBugsTask"/>
+ <findbugs home="${findbugs.home}"
+ output="html"
+ outputFile="bugs.html" >
+ <auxClasspath refid="findbugs.classpath"/>
+ <sourcePath path="${src.main.dir}" />
+ <class location="${build.jars.dir}/jboss-${module.name}.jar" />
+ </findbugs>
+
+ </target>
<!-- Examples -->
<target name="queueExample" depends="client-jar">
Modified: branches/Branch_Chunk_CRS2/build.xml
===================================================================
--- branches/Branch_Chunk_CRS2/build.xml 2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/build.xml 2008-10-30 15:37:25 UTC (rev 5220)
@@ -182,6 +182,9 @@
<ant antfile="build-messaging.xml" target="debugServer"/>
</target>
+ <target name="findbugs" depends="createthirdparty">
+ <ant antfile="build-messaging.xml" target="findbugs"/>
+ </target>
<!--example targets-->
<target name="queueExample" depends="createthirdparty">
<ant antfile="build-messaging.xml" target="queueExample"/>
Copied: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java (from rev 5219, trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java)
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java (rev 0)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java 2008-10-30 15:37:25 UTC (rev 5220)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface ScheduledDeliveryHandler
+{
+ boolean checkAndSchedule(MessageReference ref, boolean backup);
+
+ void reSchedule();
+
+ int getScheduledCount();
+
+ List<MessageReference> getScheduledMessages();
+
+ List<MessageReference> cancel();
+}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-30 15:37:25 UTC (rev 5220)
@@ -14,15 +14,11 @@
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.Set;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +37,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -75,13 +72,11 @@
private final boolean temporary;
- private final ScheduledExecutorService scheduledExecutor;
-
private final PostOffice postOffice;
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
- private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+ private final ScheduledDeliveryHandler scheduledDeliveryHandler;
private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
@@ -128,8 +123,6 @@
this.temporary = temporary;
- this.scheduledExecutor = scheduledExecutor;
-
this.postOffice = postOffice;
if (postOffice == null)
@@ -142,6 +135,8 @@
}
direct = true;
+
+ scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
}
// Queue implementation
@@ -189,7 +184,7 @@
ServerMessage msg = ref.getMessage();
- if (!checkAndSchedule(ref))
+ if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
{
messageReferences.addFirst(ref, msg.getPriority());
}
@@ -305,7 +300,10 @@
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- checkAndSchedule(ref);
+ if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
+ {
+ messageReferences.addFirst(ref, ref.getMessage().getPriority());
+ }
break;
}
@@ -352,17 +350,12 @@
public synchronized int getScheduledCount()
{
- return scheduledRunnables.size();
+ return scheduledDeliveryHandler.getScheduledCount();
}
public synchronized List<MessageReference> getScheduledMessages()
{
- List<MessageReference> refs = new ArrayList<MessageReference>();
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
- {
- refs.add(runnable.getReference());
- }
- return refs;
+ return scheduledDeliveryHandler.getScheduledMessages();
}
public int getDeliveringCount()
@@ -432,18 +425,12 @@
iter.remove();
}
- synchronized (scheduledRunnables)
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
{
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
- {
- runnable.cancel();
+ deliveringCount.incrementAndGet();
- deliveringCount.incrementAndGet();
-
- tx.addAcknowledgement(runnable.getReference());
- }
-
- scheduledRunnables.clear();
+ tx.addAcknowledgement(messageReference);
}
tx.commit();
@@ -598,10 +585,7 @@
backup = false;
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
- {
- scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
- }
+ scheduledDeliveryHandler.reSchedule();
return true;
}
@@ -720,7 +704,7 @@
sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
}
- if (checkAndSchedule(ref))
+ if (scheduledDeliveryHandler.checkAndSchedule(ref, backup))
{
return HandleStatus.HANDLED;
}
@@ -782,42 +766,6 @@
return HandleStatus.HANDLED;
}
- private boolean checkAndSchedule(final MessageReference ref)
- {
- long deliveryTime = ref.getScheduledDeliveryTime();
-
- if (deliveryTime != 0 && scheduledExecutor != null)
- {
- if (trace)
- {
- log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
- }
-
- ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
- scheduledRunnables.add(runnable);
-
- if (!backup)
- {
- scheduleDelivery(runnable, deliveryTime);
- }
-
- return true;
- }
- return false;
- }
-
- private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
- {
- long now = System.currentTimeMillis();
-
- long delay = deliveryTime - now;
-
- Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-
- runnable.setFuture(future);
- }
-
private HandleStatus deliver(final MessageReference reference)
{
HandleStatus status = distributionPolicy.distribute(reference);
@@ -870,78 +818,4 @@
}
}
- private class ScheduledDeliveryRunnable implements Runnable
- {
- private final MessageReference ref;
-
- private volatile Future<?> future;
-
- private boolean cancelled;
-
- public ScheduledDeliveryRunnable(final MessageReference ref)
- {
- this.ref = ref;
- }
-
- public synchronized void setFuture(final Future<?> future)
- {
- if (cancelled)
- {
- future.cancel(false);
- }
- else
- {
- this.future = future;
- }
- }
-
- public synchronized void cancel()
- {
- if (future != null)
- {
- future.cancel(false);
- }
-
- cancelled = true;
- }
-
- public MessageReference getReference()
- {
- return ref;
- }
-
- public void run()
- {
- if (trace)
- {
- log.trace("Scheduled delivery timeout " + ref);
- }
-
- synchronized (scheduledRunnables)
- {
- boolean removed = scheduledRunnables.remove(this);
-
- if (!removed)
- {
- log.warn("Failed to remove timeout " + this);
-
- return;
- }
- }
-
- ref.setScheduledDeliveryTime(0);
-
- HandleStatus status = deliver(ref);
-
- if (HandleStatus.HANDLED != status)
- {
- // Add back to the front of the queue
-
- // TODO - need to replicate this so backup node also adds back to
- // front of queue
-
- addFirst(ref);
- }
- }
- }
}
Copied: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java (from rev 5219, trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java)
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java (rev 0)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java 2008-10-30 15:37:25 UTC (rev 5220)
@@ -0,0 +1,200 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Handles scheduling deliveries to a queue at the correct time.
+ *
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
+{
+ private static final Logger log = Logger.getLogger(ScheduledDeliveryHandlerImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+
+ public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
+ {
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ public boolean checkAndSchedule(final MessageReference ref, final boolean backup)
+ {
+ long deliveryTime = ref.getScheduledDeliveryTime();
+
+ if (deliveryTime != 0 && scheduledExecutor != null)
+ {
+ if (trace)
+ {
+ log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+ }
+
+ ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+ scheduledRunnables.add(runnable);
+
+ if (!backup)
+ {
+ scheduleDelivery(runnable, deliveryTime);
+ }
+
+ return true;
+ }
+ return false;
+ }
+
+ public void reSchedule()
+ {
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ {
+ scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+ }
+ }
+
+ public int getScheduledCount()
+ {
+ return scheduledRunnables.size();
+ }
+
+ public List<MessageReference> getScheduledMessages()
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ synchronized (scheduledRunnables)
+ {
+ for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables)
+ {
+ refs.add(scheduledRunnable.getReference());
+ }
+ }
+ return refs;
+ }
+
+ public List<MessageReference> cancel()
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ synchronized (scheduledRunnables)
+ {
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ {
+ runnable.cancel();
+ refs.add(runnable.getReference());
+ }
+
+ scheduledRunnables.clear();
+ }
+ return refs;
+ }
+
+ private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
+ {
+ long now = System.currentTimeMillis();
+
+ long delay = deliveryTime - now;
+
+ Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+ runnable.setFuture(future);
+ }
+
+ private class ScheduledDeliveryRunnable implements Runnable
+ {
+ private final MessageReference ref;
+
+ private volatile Future<?> future;
+
+ private boolean cancelled;
+
+ public ScheduledDeliveryRunnable(final MessageReference ref)
+ {
+ this.ref = ref;
+ }
+
+ public synchronized void setFuture(final Future<?> future)
+ {
+ if (cancelled)
+ {
+ future.cancel(false);
+ }
+ else
+ {
+ this.future = future;
+ }
+ }
+
+ public synchronized void cancel()
+ {
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ cancelled = true;
+ }
+
+ public MessageReference getReference()
+ {
+ return ref;
+ }
+
+ public void run()
+ {
+ if (trace)
+ {
+ log.trace("Scheduled delivery timeout " + ref);
+ }
+
+ synchronized (scheduledRunnables)
+ {
+ boolean removed = scheduledRunnables.remove(this);
+
+ if (!removed)
+ {
+ log.warn("Failed to remove timeout " + this);
+
+ return;
+ }
+ }
+
+ ref.setScheduledDeliveryTime(0);
+ // TODO - need to replicate this so backup node also adds back to
+ // front of queue
+ ref.getQueue().addFirst(ref);
+
+ }
+ }
+}
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-30 15:37:25 UTC (rev 5220)
@@ -67,7 +67,7 @@
protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
- protected String baseDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test";
+ protected String baseDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
protected String journalDir = baseDir + "/journal";
More information about the jboss-cvs-commits
mailing list