[hornetq-commits] JBoss hornetq SVN: r9802 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 20 16:50:34 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-20 16:50:34 -0400 (Wed, 20 Oct 2010)
New Revision: 9802

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Adding bookmarking for starting a cursor on a later position

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-20 20:50:34 UTC (rev 9802)
@@ -32,6 +32,8 @@
    // To be called before the server is down
    void stop();
    
+   void bookmark(PagePosition position) throws Exception;
+   
    /** It will be 0 if non persistent cursor */
    public long getId();
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-20 20:50:34 UTC (rev 9802)
@@ -74,6 +74,8 @@
    // to be used on tests -------------------------------------------
    
    int getCacheSize();
+   
+   void printDebug();
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-20 20:50:34 UTC (rev 9802)
@@ -39,7 +39,6 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -110,9 +109,28 @@
 
    public PageCursorProvider getProvider()
    {
-      return this.cursorProvider;
+      return cursorProvider;
    }
    
+   public void bookmark(PagePosition position) throws Exception
+   {
+      if (lastPosition != null)
+      {
+         throw new RuntimeException("Bookmark can only be done at the time of the cursor's creation");
+      }
+      
+      lastPosition = position;
+      
+      PageCursorInfo cursorInfo = getPageInfo(position);
+      
+      if (position.getMessageNr() > 0)
+      {
+         cursorInfo.confirmed.addAndGet(position.getMessageNr() - 1);
+      }
+      
+      ack(position);
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
@@ -195,7 +213,6 @@
       installTXCallback(tx, position);
 
    }
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
@@ -212,7 +229,6 @@
       }
    }
 
-
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
     */
@@ -242,27 +258,24 @@
       installTXCallback(tx, position);
    }
 
-
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public void positionIgnored(PagePosition position)
+   public void positionIgnored(final PagePosition position)
    {
       processACK(position);
    }
-   
-   
+
    /**
     * All the data associated with the cursor should go away here
     */
-   public void close()  throws Exception
+   public void close() throws Exception
    {
       final long tx = store.generateUniqueID();
-      
+
       final ArrayList<Exception> ex = new ArrayList<Exception>();
-      
+
       final AtomicBoolean isPersistent = new AtomicBoolean(false);
-      
 
       // We can't delete the records at the caller's thread
       // because an executor may be holding the synchronized on PageCursorImpl
@@ -295,24 +308,24 @@
             catch (Exception e)
             {
                ex.add(e);
-               log.warn(e.getMessage(), e);
+               PageCursorImpl.log.warn(e.getMessage(), e);
             }
          }
       });
-       
+
       Future future = new Future();
-      
+
       executor.execute(future);
-      
+
       while (!future.await(5000))
       {
-         log.warn("Timeout on waiting cursor " + this + " to be closed");
+         PageCursorImpl.log.warn("Timeout on waiting cursor " + this + " to be closed");
       }
-      
-      
+
       if (isPersistent.get())
       {
-         // Another reason to perform the commit at the main thread is because the OperationContext may only send the result to the client when
+         // Another reason to perform the commit at the main thread is because the OperationContext may only send the
+         // result to the client when
          // the IO on commit is done
          if (ex.size() == 0)
          {
@@ -324,10 +337,9 @@
             throw ex.get(0);
          }
       }
-      
+
       cursorProvider.close(this);
    }
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#getId()
@@ -337,8 +349,6 @@
       return cursorId;
    }
 
-
-   
    public void processReload() throws Exception
    {
       if (recoveredACK != null)
@@ -397,29 +407,29 @@
             previousPos = pos;
          }
 
-         this.lastAckedPosition = lastPosition;
+         lastAckedPosition = lastPosition;
 
          recoveredACK.clear();
          recoveredACK = null;
       }
    }
-   
+
    public void stop()
    {
       Future future = new Future();
       executor.execute(future);
       while (!future.await(1000))
       {
-         log.warn("Waiting page cursor to finish executors - " + this);
+         PageCursorImpl.log.warn("Waiting page cursor to finish executors - " + this);
       }
    }
 
    public void printDebug()
    {
-      printDebug(this.toString());
+      printDebug(toString());
    }
-   
-   public void printDebug(String msg)
+
+   public void printDebug(final String msg)
    {
       System.out.println("Debug information on PageCurorImpl- " + msg);
       for (PageCursorInfo info : consumedPages.values())
@@ -469,7 +479,7 @@
             // there's a different page being acked, we will do the check right away
             scheduleCleanupCheck();
          }
-         this.lastAckedPosition = pos;
+         lastAckedPosition = pos;
       }
       PageCursorInfo info = getPageInfo(pos);
 
@@ -550,7 +560,7 @@
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {
-                  trace("We can't clear page " + entry.getKey() + " now since it's the current page");
+                  PageCursorImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
                }
                else
                {
@@ -588,7 +598,7 @@
          {
             executor.execute(new Runnable()
             {
-               
+
                public void run()
                {
                   synchronized (PageCursorImpl.this)
@@ -601,12 +611,13 @@
                         }
                         if (consumedPages.remove(completePage.getPageId()) == null)
                         {
-                           log.warn("Couldn't remove page " + completePage.getPageId() + " from consumed pages on cursor for address " + pageStore.getAddress());
+                           PageCursorImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
+                                                   " from consumed pages on cursor for address " +
+                                                   pageStore.getAddress());
                         }
-                      }
+                     }
                   }
-                  
-                  
+
                   cursorProvider.scheduleCleanup();
                }
             });
@@ -638,11 +649,12 @@
       // We're holding this object to avoid delete the pages before the IO is complete,
       // however we can't delete these records again
       private boolean pendingDelete;
-      
+
       // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
       // expressions
       private final AtomicInteger confirmed = new AtomicInteger(0);
 
+      @Override
       public String toString()
       {
          return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages;
@@ -663,15 +675,15 @@
       {
          return getNumberOfMessages() == confirmed.get();
       }
-      
+
       public boolean isPendingDelete()
       {
          return pendingDelete;
       }
-      
+
       public void setPendingDelete()
       {
-         this.pendingDelete = true;
+         pendingDelete = true;
       }
 
       /**
@@ -699,9 +711,13 @@
                                  pageId);
          }
 
-         if (getNumberOfMessages() == confirmed.incrementAndGet())
+         // Negative could mean a bookmark on the first element for the page (example -1)
+         if (posACK.getMessageNr() >= 0)
          {
-            onPageDone(this);
+            if (getNumberOfMessages() == confirmed.incrementAndGet())
+            {
+               onPageDone(this);
+            }
          }
       }
 
@@ -729,7 +745,7 @@
 
    }
 
-   static class PageCursorTX implements TransactionOperation
+   static class PageCursorTX extends TransactionOperationAbstract
    {
       HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
 
@@ -747,29 +763,9 @@
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
-       */
-      public void beforePrepare(final Transaction tx) throws Exception
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
-       */
-      public void afterPrepare(final Transaction tx)
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
-       */
-      public void beforeCommit(final Transaction tx) throws Exception
-      {
-      }
-
-      /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
        */
+      @Override
       public void afterCommit(final Transaction tx)
       {
          for (Entry<PageCursorImpl, List<PagePosition>> entry : pendingPositions.entrySet())
@@ -786,19 +782,6 @@
          }
       }
 
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
-       */
-      public void beforeRollback(final Transaction tx) throws Exception
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
-       */
-      public void afterRollback(final Transaction tx)
-      {
-      }
    }
 
 }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-20 20:50:34 UTC (rev 9802)
@@ -112,7 +112,7 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
+      PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)createNonPersistentCursor();
       
       PageCursor cursor = cursorProvider.createNonPersistentCursor();
       
@@ -134,6 +134,16 @@
    }
 
 
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PageCursor createNonPersistentCursor() throws Exception
+   {
+      return lookupCursorProvider().createNonPersistentCursor();
+   }
+
+
    public void testReadNextPage() throws Exception
    {
 
@@ -143,12 +153,22 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
+      PageCursorProvider cursorProvider = lookupCursorProvider();
       
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
       
       assertNull(cache);
    }
+
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private PageCursorProvider lookupCursorProvider() throws Exception
+   {
+      return lookupPageStore(ADDRESS).getCursorProvier();
+   }
    
    
    public void testRestart() throws Exception
@@ -159,7 +179,7 @@
       
       System.out.println("Number of pages = " + numberOfPages);
       
-      PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      PageCursorProvider cursorProvider = lookupCursorProvider();
       
       
       PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
@@ -480,7 +500,7 @@
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProvider cursorProvider = lookupPageStore(ADDRESS).getCursorProvier();
+      PageCursorProvider cursorProvider = lookupCursorProvider();
       
       PageCursor cursor = cursorProvider.createNonPersistentCursor();
       PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor();
@@ -527,7 +547,43 @@
    
    public void testFirstMessageInTheMiddle() throws Exception
    {
+
+      final int NUM_MESSAGES = 100;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
       
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+      PageCursor cursor = cursorProvider.createNonPersistentCursor();
+      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+      cursor.bookmark(startingPos);
+      PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+      msg.initMessage(server.getStorageManager());
+      int key = msg.getMessage().getIntProperty("key").intValue();
+      
+      msg = null;
+      
+      cache = null;
+      
+      Pair<PagePosition, PagedMessage> msgCursor = null;
+      while ((msgCursor = cursor.moveNext()) != null)
+      {
+         assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msgCursor.a);
+      }
+      assertEquals(NUM_MESSAGES, key);
+      
+      
+      forceGC();
+      
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+      
+      server.stop();
+
    }
    
    private int addMessages(final int numMessages, final int messageSize) throws Exception



More information about the hornetq-commits mailing list