[hornetq-commits] JBoss hornetq SVN: r10209 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/postoffice/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 15 17:30:27 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-15 17:30:27 -0500 (Tue, 15 Feb 2011)
New Revision: 10209

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://issues.jboss.org/browse/JBPAPP-5926 - further fixing scheduled delivery order of delivery

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -173,7 +173,7 @@
             // This file is linked to another message, deleting the reference where it belongs on this case
             linkMessage.decrementDelayDeletionCount();
          }
-         else
+         else if (delayDeletionCount.get() <= 0)
          {
             if (LargeServerMessageImpl.isTrace)
             {
@@ -192,18 +192,27 @@
       }
    }
 
+   public int incrementRefCount() throws Exception
+   {
+      int value = super.incrementRefCount();
+
+//      new Exception("increment, value=" + value +
+//                    " on msgCount = " +
+//                    this.getIntProperty("counter-message") +
+//                    " messageID=" +
+//                    this.getMessageID()).printStackTrace();
+
+      return value;
+   }
+
+   static int deleted = 0;
+
    @Override
    public synchronized int decrementRefCount() throws Exception
    {
       int currentRefCount = super.decrementRefCount();
 
-      // We use <= as this could be used by load.
-      // because of a failure, no references were loaded, so we have 0... and we still need to delete the associated
-      // files
-      if (delayDeletionCount.get() <= 0)
-      {
-         checkDelete();
-      }
+      checkDelete();
 
       return currentRefCount;
    }
@@ -315,7 +324,7 @@
             file = storageManager.createFileForLargeMessage(getMessageID(), durable);
 
             file.open();
-            
+
             bodySize = file.size();
          }
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -466,6 +466,9 @@
 
    public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
    {
+      
+      addressSettingsRepository.clear();
+      
       Binding binding = addressManager.removeBinding(uniqueName);
 
       if (binding == null)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -68,7 +68,7 @@
 
    void cancel(Transaction tx, MessageReference ref) throws Exception;
 
-   void cancel(MessageReference reference) throws Exception;
+   void cancel(MessageReference reference, long timeBase) throws Exception;
 
    void deliverAsync();
 
@@ -129,7 +129,7 @@
 
    Collection<Consumer> getConsumers();
 
-   boolean checkRedelivery(MessageReference ref) throws Exception;
+   boolean checkRedelivery(MessageReference ref, long timeBase) throws Exception;
 
    LinkedListIterator<MessageReference> iterator();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -21,7 +21,7 @@
  */
 public interface ScheduledDeliveryHandler
 {
-   boolean checkAndSchedule(MessageReference ref);
+   boolean checkAndSchedule(MessageReference ref, final boolean tail);
 
    int getScheduledCount();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -189,12 +189,14 @@
       }
 
       Queue queue = null;
+      
+      long timeBase = System.currentTimeMillis();
 
       for (MessageReference ref2 : list)
       {
          queue = ref2.getQueue();
 
-         queue.cancel(ref2);
+         queue.cancel(ref2, timeBase);
       }
 
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -1011,6 +1011,8 @@
 
    public void destroyQueue(final SimpleString queueName, final ServerSession session) throws Exception
    {
+      addressSettingsRepository.clear();
+
       Binding binding = postOffice.getBinding(queueName);
 
       if (binding == null)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -317,7 +317,7 @@
    /* Called when a message is cancelled back into the queue */
    public synchronized void addHead(final MessageReference ref)
    {
-      if (scheduledDeliveryHandler.checkAndSchedule(ref))
+      if (scheduledDeliveryHandler.checkAndSchedule(ref, false))
       {
          return;
       }
@@ -330,7 +330,7 @@
    public synchronized void reload(final MessageReference ref)
    {
       queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
-      if (!scheduledDeliveryHandler.checkAndSchedule(ref))
+      if (!scheduledDeliveryHandler.checkAndSchedule(ref, true))
       {
          internalAddTail(ref);
       }
@@ -347,7 +347,7 @@
 
    public void addTail(final MessageReference ref, final boolean direct)
    {
-      if (scheduledDeliveryHandler.checkAndSchedule(ref))
+      if (scheduledDeliveryHandler.checkAndSchedule(ref, true))
       {
          synchronized (this)
          {
@@ -806,11 +806,11 @@
       getRefsOperation(tx).addAck(reference);
    }
 
-   public synchronized void cancel(final MessageReference reference) throws Exception
+   public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception
    {
-      if (checkRedelivery(reference))
+      if (checkRedelivery(reference, timeBase))
       {
-         if (!scheduledDeliveryHandler.checkAndSchedule(reference))
+         if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
          {
             internalAddHead(reference);
          }
@@ -1275,10 +1275,6 @@
    {
       for (ConsumerHolder holder : this.consumerList)
       {
-         if (holder.iter != null)
-         {
-            holder.iter.close();
-         }
          holder.iter = null;
       }
    }
@@ -1575,7 +1571,7 @@
       }
    }
 
-   public boolean checkRedelivery(final MessageReference reference) throws Exception
+   public boolean checkRedelivery(final MessageReference reference, final long timeBase) throws Exception
    {
       ServerMessage message = reference.getMessage();
 
@@ -1604,7 +1600,7 @@
 
          if (redeliveryDelay > 0)
          {
-            reference.setScheduledDeliveryTime(System.currentTimeMillis() + redeliveryDelay);
+            reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
             
             if (message.isDurable() && durable)
             {
@@ -1987,12 +1983,14 @@
       public void afterRollback(final Transaction tx)
       {
          Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<QueueImpl, LinkedList<MessageReference>>();
+         
+         long timeBase = System.currentTimeMillis();
 
          for (MessageReference ref : refsToAck)
          {
             try
             {
-               if (ref.getQueue().checkRedelivery(ref))
+               if (ref.getQueue().checkRedelivery(ref, timeBase))
                {
                   LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -43,6 +43,8 @@
 
    private final ScheduledExecutorService scheduledExecutor;
    
+   private final Object lockDelivery = new Object();
+   
    private LinkedList<MessageReference> scheduledReferences = new LinkedList<MessageReference>();
 
    public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
@@ -50,7 +52,7 @@
       this.scheduledExecutor = scheduledExecutor;
    }
 
-   public boolean checkAndSchedule(final MessageReference ref)
+   public boolean checkAndSchedule(final MessageReference ref, final boolean tail)
    {
       long deliveryTime = ref.getScheduledDeliveryTime();
 
@@ -65,7 +67,14 @@
 
          synchronized (scheduledReferences)
          {
-            scheduledReferences.add(ref);
+            if (tail)
+            {
+               scheduledReferences.add(ref);
+            }
+            else
+            {
+               scheduledReferences.addFirst(ref);
+            }
          }
 
          scheduleDelivery(runnable, deliveryTime);
@@ -162,41 +171,43 @@
          HashSet<Queue> queues = new HashSet<Queue>();
          LinkedList<MessageReference> references = new LinkedList<MessageReference>();
 
-         synchronized (scheduledReferences)
+         synchronized (lockDelivery)
          {
-            
-            Iterator<MessageReference> iter = scheduledReferences.iterator();
-            while (iter.hasNext())
+            synchronized (scheduledReferences)
             {
-               MessageReference reference = iter.next();
-               if (reference.getScheduledDeliveryTime() == this.scheduledTime)
+               
+               Iterator<MessageReference> iter = scheduledReferences.iterator();
+               while (iter.hasNext())
                {
-                  iter.remove();
-
-                  reference.setScheduledDeliveryTime(0);
-                  
-                  references.add(reference);
-                  
-                  queues.add(reference.getQueue());
+                  MessageReference reference = iter.next();
+                  if (reference.getScheduledDeliveryTime() <= this.scheduledTime)
+                  {
+                     iter.remove();
+   
+                     reference.setScheduledDeliveryTime(0);
+                     
+                     references.add(reference);
+                     
+                     queues.add(reference.getQueue());
+                  }
                }
             }
-         }
-         
-         for (MessageReference reference : references)
-         {
-            reference.getQueue().addHead(reference);
-         }
-         
-         // Just to speed up GC
-         references.clear();
-         
-         for (Queue queue : queues)
-         {
-            synchronized (queue)
+            
+            for (MessageReference reference : references)
             {
-               queue.resetAllIterators();
-               queue.deliverAsync();
+               reference.getQueue().addTail(reference);
             }
+            
+            // Just to speed up GC
+            references.clear();
+            
+            for (Queue queue : queues)
+            {
+               synchronized (queue)
+               {
+                  queue.deliverAsync();
+               }
+            }
          }
       }
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -62,4 +62,6 @@
     * clear the repository
     */
    void clear();
+   
+   int getCacheSize();
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -76,6 +76,11 @@
       matches.put(match, match1);
       onChange();
    }
+   
+   public int getCacheSize()
+   {
+      return cache.size();
+   }
 
    /**
     * return the value held against the nearest match
@@ -153,6 +158,7 @@
    public void removeMatch(final String match)
    {
       matches.remove(match);
+      new Exception("Clearing cache").printStackTrace();
       cache.clear();
       onChange();
    }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -258,7 +258,7 @@
 
       server.getAddressSettingsRepository().clear();
       AddressSettings setting = new AddressSettings();
-      setting.setRedeliveryDelay(1000);
+      setting.setRedeliveryDelay(500);
       server.getAddressSettingsRepository().addMatch("#", setting);
 
       server.start();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -92,6 +92,29 @@
       session.close();
    }
    
+
+   public void testMemoryLeakOnAddressSettingForTemporaryQueue() throws Exception
+   {
+      for (int i = 0 ; i < 1000; i++)
+      {
+         SimpleString queue = RandomUtil.randomSimpleString();
+         SimpleString address = RandomUtil.randomSimpleString();
+         session.createTemporaryQueue(address, queue);
+         
+         session.close();
+         session = sf.createSession();
+      }
+      
+      
+      session.close();
+      
+      sf.close();
+      
+      System.out.println("size = " + server.getAddressSettingsRepository().getCacheSize());
+      
+      assertTrue(server.getAddressSettingsRepository().getCacheSize() < 10);
+   }
+   
    public void testPaginStoreIsRemovedWhenQueueIsDeleted() throws Exception
    {
       SimpleString queue = RandomUtil.randomSimpleString();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -166,7 +166,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
     */
-   public void cancel(final MessageReference reference) throws Exception
+   public void cancel(final MessageReference reference, final long timeBase) throws Exception
    {
       // TODO Auto-generated method stub
 
@@ -211,7 +211,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
     */
-   public boolean checkRedelivery(final MessageReference ref) throws Exception
+   public boolean checkRedelivery(final MessageReference ref, final long timeBase) throws Exception
    {
       // TODO Auto-generated method stub
       return false;

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-02-15 21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-02-15 22:30:27 UTC (rev 10209)
@@ -494,16 +494,19 @@
       File largeMessagesFileDir = new File(getLargeMessagesDir());
 
       // Deleting the file is async... we keep looking for a period of the time until the file is really gone
-      for (int i = 0; i < 100; i++)
+      long timeout = System.currentTimeMillis() + 5000;
+      while (timeout > System.currentTimeMillis() && largeMessagesFileDir.listFiles().length != expect)
       {
-         if (largeMessagesFileDir.listFiles().length != expect)
+         Thread.sleep(100);
+      }
+      
+      
+      if (expect != largeMessagesFileDir.listFiles().length)
+      {
+         for (File file : largeMessagesFileDir.listFiles())
          {
-            Thread.sleep(10);
+            System.out.println("File " + file + " still on ");
          }
-         else
-         {
-            break;
-         }
       }
 
       Assert.assertEquals(expect, largeMessagesFileDir.listFiles().length);



More information about the hornetq-commits mailing list