[jboss-cvs] JBoss Messaging SVN: r5220 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/server and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 30 11:37:25 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-30 11:37:25 -0400 (Thu, 30 Oct 2008)
New Revision: 5220

Added:
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
Modified:
   branches/Branch_Chunk_CRS2/build-messaging.xml
   branches/Branch_Chunk_CRS2/build.xml
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Merges from trunk (Andy's changes)

Modified: branches/Branch_Chunk_CRS2/build-messaging.xml
===================================================================
--- branches/Branch_Chunk_CRS2/build-messaging.xml	2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/build-messaging.xml	2008-10-30 15:37:25 UTC (rev 5220)
@@ -211,6 +211,20 @@
       <path refid="easymock.easymock.classpath" />
    </path>
 
+   <path id="findbugs.classpath">
+      <path refid="core.compilation.classpath"/>
+      <path refid="jms.compilation.classpath"/>
+      <path location="${build.jars.dir}/jboss-${module.name}.jar"/>
+      <path refid="junit.junit.classpath"/>
+      <path refid="jboss.profiler.jvmti.classpath"/>
+      <path refid="jboss.test14.classpath"/>
+      <path refid="jboss.jboss.retro.classpath"/>
+      <path refid="easymock.easymock.classpath" />
+      <path refid="slf4j.api.classpath"/>
+      <path refid="slf4j.log4j.classpath"/>
+      <path refid="jboss.jbossxb.classpath"/>
+   </path>
+
    <path id="unit.test.execution.classpath">
       <pathelement location="${test.dir}/config"/>
       <pathelement location="${test.dir}/tmpfiles"/>
@@ -946,6 +960,17 @@
       </java>
    </target>
 
+   <target name="findbugs" depends="jar">
+      <taskdef name="findbugs" classname="edu.umd.cs.findbugs.anttask.FindBugsTask"/>
+      <findbugs home="${findbugs.home}"
+                   output="html"
+                   outputFile="bugs.html" >
+           <auxClasspath refid="findbugs.classpath"/>
+           <sourcePath path="${src.main.dir}" />
+           <class location="${build.jars.dir}/jboss-${module.name}.jar" />
+         </findbugs>
+
+   </target>
    <!-- Examples -->
 
    <target name="queueExample" depends="client-jar">

Modified: branches/Branch_Chunk_CRS2/build.xml
===================================================================
--- branches/Branch_Chunk_CRS2/build.xml	2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/build.xml	2008-10-30 15:37:25 UTC (rev 5220)
@@ -182,6 +182,9 @@
       <ant antfile="build-messaging.xml" target="debugServer"/>
    </target>
 
+    <target name="findbugs" depends="createthirdparty">
+       <ant antfile="build-messaging.xml" target="findbugs"/>
+    </target>
    <!--example targets-->
    <target name="queueExample" depends="createthirdparty">
       <ant antfile="build-messaging.xml" target="queueExample"/>

Copied: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java (from rev 5219, trunk/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java)
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java	                        (rev 0)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/ScheduledDeliveryHandler.java	2008-10-30 15:37:25 UTC (rev 5220)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface ScheduledDeliveryHandler
+{
+   boolean checkAndSchedule(MessageReference ref, boolean backup);
+
+   void reSchedule();
+
+   int getScheduledCount();
+
+   List<MessageReference> getScheduledMessages();
+
+   List<MessageReference> cancel();
+}

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-30 15:37:25 UTC (rev 5220)
@@ -14,15 +14,11 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -41,6 +37,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -75,13 +72,11 @@
 
    private final boolean temporary;
 
-   private final ScheduledExecutorService scheduledExecutor;
-
    private final PostOffice postOffice;
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
-   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+   private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
    private volatile DistributionPolicy distributionPolicy = new RoundRobinDistributionPolicy();
 
@@ -128,8 +123,6 @@
 
       this.temporary = temporary;
 
-      this.scheduledExecutor = scheduledExecutor;
-
       this.postOffice = postOffice;
       
       if (postOffice == null)
@@ -142,6 +135,8 @@
       }
 
       direct = true;
+
+      scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
    }
 
    // Queue implementation
@@ -189,7 +184,7 @@
          
          ServerMessage msg = ref.getMessage();
 
-         if (!checkAndSchedule(ref))
+         if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
          {
             messageReferences.addFirst(ref, msg.getPriority());
          }
@@ -305,7 +300,10 @@
             
             ref.setScheduledDeliveryTime(scheduledDeliveryTime);
             
-            checkAndSchedule(ref);
+            if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
+            {
+               messageReferences.addFirst(ref, ref.getMessage().getPriority());
+            }
 
             break;
          }
@@ -352,17 +350,12 @@
 
    public synchronized int getScheduledCount()
    {
-      return scheduledRunnables.size();
+      return scheduledDeliveryHandler.getScheduledCount();
    }
 
    public synchronized List<MessageReference> getScheduledMessages()
    {
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-      for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-      {
-         refs.add(runnable.getReference());
-      }
-      return refs;
+      return scheduledDeliveryHandler.getScheduledMessages();
    }
 
    public int getDeliveringCount()
@@ -432,18 +425,12 @@
          iter.remove();
       }
 
-      synchronized (scheduledRunnables)
+      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+      for (MessageReference messageReference : cancelled)
       {
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-         {
-            runnable.cancel();
+          deliveringCount.incrementAndGet();
 
-            deliveringCount.incrementAndGet();
-
-            tx.addAcknowledgement(runnable.getReference());
-         }
-
-         scheduledRunnables.clear();
+          tx.addAcknowledgement(messageReference);
       }
 
       tx.commit();
@@ -598,10 +585,7 @@
 
          backup = false;
 
-         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
-         {
-            scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
-         }
+         scheduledDeliveryHandler.reSchedule();
 
          return true;
       }
@@ -720,7 +704,7 @@
          sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
       }
 
-      if (checkAndSchedule(ref))
+      if (scheduledDeliveryHandler.checkAndSchedule(ref, backup))
       {
          return HandleStatus.HANDLED;
       }
@@ -782,42 +766,6 @@
       return HandleStatus.HANDLED;
    }
 
-   private boolean checkAndSchedule(final MessageReference ref)
-   {
-      long deliveryTime = ref.getScheduledDeliveryTime();
-
-      if (deliveryTime != 0 && scheduledExecutor != null)
-      {
-         if (trace)
-         {
-            log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
-         }
-
-         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
-         scheduledRunnables.add(runnable);
-
-         if (!backup)
-         {
-            scheduleDelivery(runnable, deliveryTime);
-         }
-
-         return true;
-      }
-      return false;
-   }
-
-   private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
-   {
-      long now = System.currentTimeMillis();
-
-      long delay = deliveryTime - now;
-
-      Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-
-      runnable.setFuture(future);
-   }
-
    private HandleStatus deliver(final MessageReference reference)
    {
       HandleStatus status = distributionPolicy.distribute(reference);
@@ -870,78 +818,4 @@
       }
    }
 
-   private class ScheduledDeliveryRunnable implements Runnable
-   {
-      private final MessageReference ref;
-
-      private volatile Future<?> future;
-
-      private boolean cancelled;
-
-      public ScheduledDeliveryRunnable(final MessageReference ref)
-      {
-         this.ref = ref;
-      }
-
-      public synchronized void setFuture(final Future<?> future)
-      {
-         if (cancelled)
-         {
-            future.cancel(false);
-         }
-         else
-         {
-            this.future = future;
-         }
-      }
-
-      public synchronized void cancel()
-      {
-         if (future != null)
-         {
-            future.cancel(false);
-         }
-
-         cancelled = true;
-      }
-
-      public MessageReference getReference()
-      {
-         return ref;
-      }
-
-      public void run()
-      {
-         if (trace)
-         {
-            log.trace("Scheduled delivery timeout " + ref);
-         }
-
-         synchronized (scheduledRunnables)
-         {
-            boolean removed = scheduledRunnables.remove(this);
-
-            if (!removed)
-            {
-               log.warn("Failed to remove timeout " + this);
-
-               return;
-            }
-         }
-
-         ref.setScheduledDeliveryTime(0);
-
-         HandleStatus status = deliver(ref);
-
-         if (HandleStatus.HANDLED != status)
-         {
-            // Add back to the front of the queue
-
-            // TODO - need to replicate this so backup node also adds back to
-            // front of queue
-
-            addFirst(ref);
-         }
-      }
-   }
 }

Copied: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java (from rev 5219, trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java)
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	                        (rev 0)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	2008-10-30 15:37:25 UTC (rev 5220)
@@ -0,0 +1,200 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Handles scheduling deliveries to a queue at the correct time.
+ * 
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
+{
+   private static final Logger log = Logger.getLogger(ScheduledDeliveryHandlerImpl.class);
+
+   private static final boolean trace = log.isTraceEnabled();
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
+
+   public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
+   {
+      this.scheduledExecutor = scheduledExecutor;
+   }
+
+   public boolean checkAndSchedule(final MessageReference ref, final boolean backup)
+   {
+      long deliveryTime = ref.getScheduledDeliveryTime();
+
+      if (deliveryTime != 0 && scheduledExecutor != null)
+      {
+         if (trace)
+         {
+            log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+         }
+
+         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+         scheduledRunnables.add(runnable);
+
+         if (!backup)
+         {
+            scheduleDelivery(runnable, deliveryTime);
+         }
+
+         return true;
+      }
+      return false;
+   }
+
+   public void reSchedule()
+   {
+      for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+      {
+         scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
+      }
+   }
+
+   public int getScheduledCount()
+   {
+      return scheduledRunnables.size();
+   }
+
+   public List<MessageReference> getScheduledMessages()
+   {
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables)
+         {
+            refs.add(scheduledRunnable.getReference());
+         }
+      }
+      return refs;
+   }
+
+   public List<MessageReference> cancel()
+   {
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      synchronized (scheduledRunnables)
+      {
+         for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+         {
+            runnable.cancel();
+            refs.add(runnable.getReference());
+         }
+
+         scheduledRunnables.clear();
+      }
+      return refs;
+   }
+
+   private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
+   {
+      long now = System.currentTimeMillis();
+
+      long delay = deliveryTime - now;
+
+      Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+
+      runnable.setFuture(future);
+   }
+
+   private class ScheduledDeliveryRunnable implements Runnable
+   {
+      private final MessageReference ref;
+
+      private volatile Future<?> future;
+
+      private boolean cancelled;
+
+      public ScheduledDeliveryRunnable(final MessageReference ref)
+      {
+         this.ref = ref;
+      }
+
+      public synchronized void setFuture(final Future<?> future)
+      {
+         if (cancelled)
+         {
+            future.cancel(false);
+         }
+         else
+         {
+            this.future = future;
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         if (future != null)
+         {
+            future.cancel(false);
+         }
+
+         cancelled = true;
+      }
+
+      public MessageReference getReference()
+      {
+         return ref;
+      }
+
+      public void run()
+      {
+         if (trace)
+         {
+            log.trace("Scheduled delivery timeout " + ref);
+         }
+
+         synchronized (scheduledRunnables)
+         {
+            boolean removed = scheduledRunnables.remove(this);
+
+            if (!removed)
+            {
+               log.warn("Failed to remove timeout " + this);
+
+               return;
+            }
+         }
+
+         ref.setScheduledDeliveryTime(0);
+         // TODO - need to replicate this so backup node also adds back to
+         // front of queue
+         ref.getQueue().addFirst(ref);
+
+      }
+   }
+}

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-30 14:53:57 UTC (rev 5219)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-30 15:37:25 UTC (rev 5220)
@@ -67,7 +67,7 @@
 
    protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
 
-   protected String baseDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test";
+   protected String baseDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
    
    protected String journalDir = baseDir + "/journal";
 




More information about the jboss-cvs-commits mailing list