[hornetq-commits] JBoss hornetq SVN: r10205 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 14 20:04:22 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-14 20:04:21 -0500 (Mon, 14 Feb 2011)
New Revision: 10205

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://issues.jboss.org/browse/JBPAPP-5926 - fixing scheduled delivery order of delivery

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-15 01:04:21 UTC (rev 10205)
@@ -129,7 +129,7 @@
 
    Collection<Consumer> getConsumers();
 
-   boolean checkDLQ(MessageReference ref) throws Exception;
+   boolean checkRedelivery(MessageReference ref) throws Exception;
 
    LinkedListIterator<MessageReference> iterator();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-15 01:04:21 UTC (rev 10205)
@@ -808,7 +808,7 @@
 
    public synchronized void cancel(final MessageReference reference) throws Exception
    {
-      if (checkDLQ(reference))
+      if (checkRedelivery(reference))
       {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference))
          {
@@ -1275,6 +1275,10 @@
    {
       for (ConsumerHolder holder : this.consumerList)
       {
+         if (holder.iter != null)
+         {
+            holder.iter.close();
+         }
          holder.iter = null;
       }
    }
@@ -1571,11 +1575,12 @@
       }
    }
 
-   public boolean checkDLQ(final MessageReference reference) throws Exception
+   public boolean checkRedelivery(final MessageReference reference) throws Exception
    {
       ServerMessage message = reference.getMessage();
 
       // TODO: DeliveryCount on paging
+      
       if (message.isDurable() && durable && !reference.isPaged())
       {
          storageManager.updateDeliveryCount(reference);
@@ -1585,6 +1590,7 @@
 
       int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
 
+      // First check DLA
       if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
       {
          sendToDeadLetterAddress(reference);
@@ -1593,13 +1599,17 @@
       }
       else
       {
+         // Second check Redelivery Delay
          long redeliveryDelay = addressSettings.getRedeliveryDelay();
 
          if (redeliveryDelay > 0)
          {
             reference.setScheduledDeliveryTime(System.currentTimeMillis() + redeliveryDelay);
-
-            storageManager.updateScheduledDeliveryTime(reference);
+            
+            if (message.isDurable() && durable)
+            {
+               storageManager.updateScheduledDeliveryTime(reference);
+            }
          }
 
          deliveringCount.decrementAndGet();
@@ -1982,7 +1992,7 @@
          {
             try
             {
-               if (ref.getQueue().checkDLQ(ref))
+               if (ref.getQueue().checkRedelivery(ref))
                {
                   LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2011-02-15 01:04:21 UTC (rev 10205)
@@ -13,9 +13,10 @@
 package org.hornetq.core.server.impl;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -23,6 +24,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ScheduledDeliveryHandler;
 
 /**
@@ -40,9 +42,9 @@
    private static final boolean trace = ScheduledDeliveryHandlerImpl.log.isTraceEnabled();
 
    private final ScheduledExecutorService scheduledExecutor;
+   
+   private LinkedList<MessageReference> scheduledReferences = new LinkedList<MessageReference>();
 
-   private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
-
    public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
    {
       this.scheduledExecutor = scheduledExecutor;
@@ -52,18 +54,18 @@
    {
       long deliveryTime = ref.getScheduledDeliveryTime();
 
-      if (deliveryTime > System.currentTimeMillis() && scheduledExecutor != null)
+      if (deliveryTime > 0 && scheduledExecutor != null)
       {
          if (ScheduledDeliveryHandlerImpl.trace)
          {
             ScheduledDeliveryHandlerImpl.log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
          }
 
-         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref.getScheduledDeliveryTime());
 
-         synchronized (scheduledRunnables)
+         synchronized (scheduledReferences)
          {
-            scheduledRunnables.put(ref.getMessage().getMessageID(), runnable);
+            scheduledReferences.add(ref);
          }
 
          scheduleDelivery(runnable, deliveryTime);
@@ -75,19 +77,19 @@
 
    public int getScheduledCount()
    {
-      return scheduledRunnables.size();
+      synchronized (scheduledReferences)
+      {
+         return scheduledReferences.size();
+      }
    }
 
    public List<MessageReference> getScheduledReferences()
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
 
-      synchronized (scheduledRunnables)
+      synchronized (scheduledReferences)
       {
-         for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables.values())
-         {
-            refs.add(scheduledRunnable.getReference());
-         }
+         refs.addAll(scheduledReferences);
       }
       return refs;
    }
@@ -96,40 +98,40 @@
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
 
-      synchronized (scheduledRunnables)
+      synchronized (scheduledReferences)
       {
-         Map<Long, ScheduledDeliveryRunnable> copy = new LinkedHashMap<Long, ScheduledDeliveryRunnable>(scheduledRunnables);
-         for (ScheduledDeliveryRunnable runnable : copy.values())
+         Iterator<MessageReference> iter = scheduledReferences.iterator();
+         
+         while (iter.hasNext())
          {
-            if (filter == null || filter.match(runnable.getReference().getMessage()))
+            MessageReference ref = iter.next();
+            if (filter.match(ref.getMessage()))
             {
-               runnable.cancel();
-
-               refs.add(runnable.getReference());
+               iter.remove();
+               refs.add(ref);
             }
          }
-         for (MessageReference ref : refs)
-         {
-            scheduledRunnables.remove(ref.getMessage().getMessageID());
-         }
       }
       return refs;
    }
 
    public MessageReference removeReferenceWithID(final long id)
    {
-      synchronized (scheduledRunnables)
+      synchronized (scheduledReferences)
       {
-         ScheduledDeliveryRunnable runnable = scheduledRunnables.remove(id);
-         if (runnable == null)
+         Iterator<MessageReference> iter = scheduledReferences.iterator();
+         while (iter.hasNext())
          {
-            return null;
+            MessageReference ref = iter.next();
+            if (ref.getMessage().getMessageID() == id)
+            {
+               iter.remove();
+               return ref;
+            }
          }
-         else
-         {
-            return runnable.getReference();
-         }
       }
+      
+      return null;
    }
 
    private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
@@ -137,81 +139,65 @@
       long now = System.currentTimeMillis();
 
       long delay = deliveryTime - now;
+      
+      if (delay < 0)
+      {
+         delay = 0;
+      }
 
-      Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-
-      runnable.setFuture(future);
+      scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
    }
 
    private class ScheduledDeliveryRunnable implements Runnable
    {
-      private final MessageReference ref;
+      private final long scheduledTime;
 
-      private volatile Future<?> future;
-
-      private boolean cancelled;
-
-      public ScheduledDeliveryRunnable(final MessageReference ref)
+      public ScheduledDeliveryRunnable(final long scheduledTime)
       {
-         this.ref = ref;
+         this.scheduledTime = scheduledTime;
       }
 
-      public synchronized void setFuture(final Future<?> future)
-      {
-         if (cancelled)
-         {
-            future.cancel(false);
-         }
-         else
-         {
-            this.future = future;
-         }
-      }
-
-      public synchronized void cancel()
-      {
-         if (future != null)
-         {
-            future.cancel(false);
-         }
-
-         cancelled = true;
-      }
-
-      public MessageReference getReference()
-      {
-         return ref;
-      }
-
       public void run()
       {
-         if (ScheduledDeliveryHandlerImpl.trace)
-         {
-            ScheduledDeliveryHandlerImpl.log.trace("Scheduled delivery timeout " + ref);
-         }
+         HashSet<Queue> queues = new HashSet<Queue>();
+         LinkedList<MessageReference> references = new LinkedList<MessageReference>();
 
-         synchronized (scheduledRunnables)
+         synchronized (scheduledReferences)
          {
-            Object removed = scheduledRunnables.remove(ref.getMessage().getMessageID());
-
-            if (removed == null)
+            
+            Iterator<MessageReference> iter = scheduledReferences.iterator();
+            while (iter.hasNext())
             {
-               ScheduledDeliveryHandlerImpl.log.warn("Failed to remove timeout " + this);
+               MessageReference reference = iter.next();
+               if (reference.getScheduledDeliveryTime() == this.scheduledTime)
+               {
+                  iter.remove();
 
-               return;
+                  reference.setScheduledDeliveryTime(0);
+                  
+                  references.add(reference);
+                  
+                  queues.add(reference.getQueue());
+               }
             }
          }
-
-         ref.setScheduledDeliveryTime(0);
-
-         synchronized (ref.getQueue())
+         
+         for (MessageReference reference : references)
          {
-            ref.getQueue().resetAllIterators();
-
-            ref.getQueue().addHead(ref);
-            
-            ref.getQueue().deliverAsync();
+            reference.getQueue().addHead(reference);
          }
+         
+         // Just to speed up GC
+         references.clear();
+         
+         for (Queue queue : queues)
+         {
+            synchronized (queue)
+            {
+               queue.resetAllIterators();
+               queue.deliverAsync();
+            }
+         }
       }
    }
 }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2011-02-15 01:04:21 UTC (rev 10205)
@@ -15,9 +15,15 @@
 
 import junit.framework.Assert;
 
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 
 /**
@@ -31,10 +37,9 @@
 {
 
    // Constants -----------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(OrderTest.class);
 
-
    // Attributes ----------------------------------------------------
 
    private HornetQServer server;
@@ -78,7 +83,6 @@
       server = createServer(persistent, true);
       server.start();
 
-
       locator.setBlockOnNonDurableSend(false);
       locator.setBlockOnDurableSend(false);
       locator.setBlockOnAcknowledge(true);
@@ -126,7 +130,7 @@
                if (!started || started && i % 2 == 0)
                {
                   ClientMessage msg = cons.receive(10000);
-                  
+
                   Assert.assertEquals(i, msg.getIntProperty("id").intValue());
                }
             }
@@ -140,7 +144,7 @@
                if (!started || started && i % 2 == 0)
                {
                   ClientMessage msg = cons.receive(10000);
-               
+
                   Assert.assertEquals(i, msg.getIntProperty("id").intValue());
                }
             }
@@ -170,9 +174,9 @@
    public void doTestOverCancel(final boolean persistent) throws Exception
    {
       server = createServer(persistent, true);
+
       server.start();
 
-
       locator.setBlockOnNonDurableSend(false);
       locator.setBlockOnDurableSend(false);
       locator.setBlockOnAcknowledge(false);
@@ -237,6 +241,94 @@
 
    }
 
+   public void testOrderOverSessionClosePersistentWithRedeliveryDelay() throws Exception
+   {
+      doTestOverCancelWithRedelivery(true);
+   }
+
+   public void testOrderOverSessionCloseNonPersistentWithRedeliveryDelay() throws Exception
+   {
+      doTestOverCancelWithRedelivery(false);
+   }
+
+
+   public void doTestOverCancelWithRedelivery(final boolean persistent) throws Exception
+   {
+      server = createServer(persistent, true);
+
+      server.getAddressSettingsRepository().clear();
+      AddressSettings setting = new AddressSettings();
+      setting.setRedeliveryDelay(1000);
+      server.getAddressSettingsRepository().addMatch("#", setting);
+
+      server.start();
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(false);
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+      ClientSession session = sf.createSession(true, true, 0);
+
+      int numberOfMessages = 500;
+
+      try
+      {
+         session.createQueue("queue", "queue", true);
+
+         ClientProducer prod = session.createProducer("queue");
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = session.createMessage(i % 2 == 0);
+            msg.putIntProperty("id", i);
+            prod.send(msg);
+         }
+
+         session.close();
+
+         session = sf.createSession(false, false);;
+         
+         session.start();
+         
+         ClientConsumer cons = session.createConsumer("queue");
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons.receive(5000);
+            msg.acknowledge();
+            assertEquals(i, msg.getIntProperty("id").intValue());
+         }
+         session.close();
+
+         
+         session = sf.createSession(false, false);;
+         
+         session.start();
+         
+         cons = session.createConsumer("queue");
+         
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons.receive(5000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            assertEquals(i, msg.getIntProperty("id").intValue());
+         }
+         
+         // receive again
+         session.commit();
+         session.close();
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-15 01:04:21 UTC (rev 10205)
@@ -211,7 +211,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
     */
-   public boolean checkDLQ(final MessageReference ref) throws Exception
+   public boolean checkRedelivery(final MessageReference ref) throws Exception
    {
       // TODO Auto-generated method stub
       return false;



More information about the hornetq-commits mailing list