[hornetq-commits] JBoss hornetq SVN: r9846 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 4 18:47:34 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-04 18:47:33 -0400 (Thu, 04 Nov 2010)
New Revision: 9846

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
just a backup

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-04 15:05:08 UTC (rev 9845)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-04 22:47:33 UTC (rev 9846)
@@ -37,8 +37,11 @@
    long getId();
 
    boolean isPersistent();
+   
+   /** Used as a delegate method to pageStore.isPaging() */
+   boolean isPaging();
 
-   public LinkedListIterator<PagedReferenceImpl> iterator();
+   public LinkedListIterator<PagedReference> iterator();
 
    // To be called when the cursor is closed for good. Most likely when the queue is deleted
    void close() throws Exception;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-04 15:05:08 UTC (rev 9845)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-04 22:47:33 UTC (rev 9846)
@@ -129,6 +129,11 @@
       return queue;
    }
    
+   public boolean isPaging()
+   {
+      return pageStore.isPaging();
+   }
+   
    public void setQueue(Queue queue)
    {
       this.queue = queue;
@@ -168,7 +173,7 @@
       ack(position);
    }
 
-   class CursorIterator implements LinkedListIterator<PagedReferenceImpl>
+   class CursorIterator implements LinkedListIterator<PagedReference>
    {
       PagePosition position = getLastPosition();
 
@@ -204,7 +209,7 @@
       /* (non-Javadoc)
        * @see java.util.Iterator#next()
        */
-      public PagedReferenceImpl next()
+      public synchronized PagedReferenceImpl next()
       {
          
          if (cachedNext != null)
@@ -239,7 +244,9 @@
          }
       }
 
-      public boolean hasNext()
+      /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 
+       *  It would be a rare race condition but I would prefer avoiding that scenario */
+      public synchronized boolean hasNext()
       {
          // if an unbehaved program called hasNext twice before next, we only cache it once.
          if (cachedNext != null)
@@ -247,6 +254,11 @@
             return true;
          }
          
+         if (!pageStore.isPaging())
+         {
+            return false;
+         }
+         
          cachedNext = next();
 
          return cachedNext != null;
@@ -276,7 +288,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
     */
-   public LinkedListIterator<PagedReferenceImpl> iterator()
+   public LinkedListIterator<PagedReference> iterator()
    {
       return new CursorIterator();
    }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-04 15:05:08 UTC (rev 9845)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-04 22:47:33 UTC (rev 9846)
@@ -33,6 +33,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -92,6 +93,8 @@
    private final PostOffice postOffice;
    
    private final PageSubscription pageSubscription;
+   
+   private final LinkedListIterator<PagedReference> pageIterator;
 
    private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
 
@@ -109,6 +112,8 @@
 
    private final Runnable deliverRunner = new DeliverRunner();
 
+   private final Runnable depageRunner = new DepageRunner();
+
    private final StorageManager storageManager;
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -221,7 +226,12 @@
       if (pageSubscription != null)
       {
          pageSubscription.setQueue(this);
+         this.pageIterator = pageSubscription.iterator();
       }
+      else
+      {
+         this.pageIterator = null;
+      }
 
       this.executor = executor;
 
@@ -339,7 +349,7 @@
       // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
       if (checkDirect)
       {
-         if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty())
+         if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging())
          {
             // We must block on the executor to ensure any async deliveries have completed or we might get out of order
             // deliveries
@@ -1225,7 +1235,30 @@
             pos = 0;
          }
       }
+      
+      if (messageReferences.size() == 0 && pageIterator.hasNext())
+      {
+         scheduleDepage();
+      }
    }
+   
+   private void scheduleDepage()
+   {
+      executor.execute(depageRunner);
+   }
+   
+   private void depage()
+   {
+      int nmessages = 0;
+      while (nmessages < MAX_DELIVERIES_IN_LOOP && pageIterator.hasNext())
+      {
+         nmessages ++;
+         addTail(pageIterator.next(), false);
+         pageIterator.remove();
+      }
+      
+      deliverAsync();
+   }
 
    private void internalAddRedistributor(final Executor executor)
    {
@@ -1716,6 +1749,21 @@
       }
    }
 
+   private class DepageRunner implements Runnable
+   {
+      public void run()
+      {
+         try
+         {
+            depage();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to deliver", e);
+         }
+      }
+   }
+
    private class ConcurrentPoller implements Runnable
    {
       public void run()

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-04 15:05:08 UTC (rev 9845)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-04 22:47:33 UTC (rev 9846)
@@ -15,7 +15,6 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.Assert;
@@ -32,7 +31,7 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagedReferenceImpl;
+import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
@@ -120,9 +119,9 @@
 
       PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
 
-      PagedReferenceImpl msg;
+      PagedReference msg;
 
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
       int key = 0;
       while ((msg = iterator.next()) != null)
       {
@@ -205,11 +204,11 @@
 
       queue.getPageSubscription().close();
 
-      PagedReferenceImpl msg;
+      PagedReference msg;
 
-      LinkedListIterator<PagedReferenceImpl> iteratorEven = cursorEven.iterator();
+      LinkedListIterator<PagedReference> iteratorEven = cursorEven.iterator();
 
-      LinkedListIterator<PagedReferenceImpl> iteratorOdd = cursorOdd.iterator();
+      LinkedListIterator<PagedReference> iteratorOdd = cursorOdd.iterator();
 
       int key = 0;
       while ((msg = iteratorEven.next()) != null)
@@ -285,12 +284,12 @@
       System.out.println("Cursor: " + cursor);
       cursorProvider.printDebug();
 
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
       for (int i = 0; i < 1000; i++)
       {
          System.out.println("Reading Msg : " + i);
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertNotNull(msg);
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
 
@@ -319,7 +318,7 @@
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
          System.out.println("Received " + i);
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertNotNull(msg);
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
 
@@ -361,10 +360,10 @@
                                            .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
       for (int i = 0; i < 100; i++)
       {
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
@@ -383,14 +382,14 @@
 
       for (int i = 10; i <= 20; i++)
       {
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg);
       }
 
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg);
       }
@@ -422,11 +421,11 @@
 
       Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
 
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
       for (int i = 0; i < 100; i++)
       {
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
@@ -449,14 +448,14 @@
 
       for (int i = 10; i <= 20; i++)
       {
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          cursor.ackTx(tx, msg);
       }
 
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         PagedReferenceImpl msg = iterator.next();
+         PagedReference msg = iterator.next();
          assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
          cursor.ackTx(tx, msg);
       }
@@ -490,7 +489,7 @@
 
       System.out.println("Cursor: " + cursor);
 
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
@@ -506,7 +505,7 @@
 
          Assert.assertTrue(pageStore.page(msg));
 
-         PagedReferenceImpl readMessage = iterator.next();
+         PagedReference readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
@@ -544,7 +543,7 @@
             Assert.assertTrue(pageStore.page(msg));
          }
 
-         PagedReferenceImpl readMessage = iterator.next();
+         PagedReference readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
@@ -580,7 +579,7 @@
             Assert.assertTrue(pageStore.page(msg));
          }
 
-         PagedReferenceImpl readMessage = iterator.next();
+         PagedReference readMessage = iterator.next();
 
          assertNotNull(readMessage);
 
@@ -589,7 +588,7 @@
          assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
       }
 
-      PagedReferenceImpl readMessage = iterator.next();
+      PagedReference readMessage = iterator.next();
 
       assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
 
@@ -647,7 +646,7 @@
                                            .getPageStore(ADDRESS)
                                            .getCursorProvier()
                                            .getSubscription(queue.getID());
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
       System.out.println("Cursor: " + cursor);
 
@@ -676,7 +675,7 @@
       // First consume what's already there without any tx as nothing was committed
       for (int i = 300; i < 400; i++)
       {
-         PagedReferenceImpl pos = iterator.next();
+         PagedReference pos = iterator.next();
          assertNotNull("Null at position " + i, pos);
          assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
          cursor.ack(pos);
@@ -693,7 +692,7 @@
       // Second:after pgtxCommit was done
       for (int i = 200; i < 300; i++)
       {
-         PagedReferenceImpl pos = iterator.next();
+         PagedReference pos = iterator.next();
          assertNotNull(pos);
          assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
          cursor.ack(pos);
@@ -724,9 +723,9 @@
 
       queue.getPageSubscription().close();
 
-      PagedReferenceImpl msg;
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
-      LinkedListIterator<PagedReferenceImpl> iterator2 = cursor.iterator();
+      PagedReference msg;
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator2 = cursor.iterator();
 
       int key = 0;
       while ((msg = iterator.next()) != null)
@@ -803,9 +802,9 @@
       msg = null;
 
       cache = null;
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
-      PagedReferenceImpl msgCursor = null;
+      PagedReference msgCursor = null;
       while ((msgCursor = iterator.next()) != null)
       {
          assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
@@ -848,9 +847,9 @@
 
       cache = null;
 
-      LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+      LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
-      PagedReferenceImpl msgCursor = null;
+      PagedReference msgCursor = null;
       while ((msgCursor = iterator.next()) != null)
       {
          assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
@@ -902,15 +901,15 @@
 
       PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
 
-      LinkedListIterator<PagedReferenceImpl> iter = cursor.iterator();
+      LinkedListIterator<PagedReference> iter = cursor.iterator();
       
-      LinkedListIterator<PagedReferenceImpl> iter2 = cursor.iterator();
+      LinkedListIterator<PagedReference> iter2 = cursor.iterator();
       
       assertTrue(iter.hasNext());
       
-      PagedReferenceImpl msg1 = iter.next();
+      PagedReference msg1 = iter.next();
       
-      PagedReferenceImpl msg2 = iter2.next();
+      PagedReference msg2 = iter2.next();
       
       assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
       



More information about the hornetq-commits mailing list