[hornetq-commits] JBoss hornetq SVN: r9804 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 20 18:45:48 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-20 18:45:48 -0400 (Wed, 20 Oct 2010)
New Revision: 9804

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
more on bookmarking cursors

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-20 21:09:02 UTC (rev 9803)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-20 22:45:48 UTC (rev 9804)
@@ -358,11 +358,21 @@
             PageCursorImpl.trace("********** processing reload!!!!!!!");
          }
          Collections.sort(recoveredACK);
+         
+         boolean first = true;
 
          PagePosition previousPos = null;
          for (PagePosition pos : recoveredACK)
          {
             PageCursorInfo positions = getPageInfo(pos);
+            if (first)
+            {
+               first = false;
+               if (pos.getMessageNr() > 0)
+               {
+                  positions.confirmed.addAndGet(pos.getMessageNr());
+               }
+            }
 
             positions.addACK(pos);
 
@@ -556,7 +566,7 @@
          for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
          {
             PageCursorInfo info = entry.getValue();
-            if (info.isDone() && !info.isPendingDelete())
+            if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-20 21:09:02 UTC (rev 9803)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-20 22:45:48 UTC (rev 9804)
@@ -130,7 +130,7 @@
 
       while (true)
       {
-         Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
+         Pair<PagePosition, PagedMessage> retPos = internalGetNext(cursorPos);
 
          if (retPos == null)
          {
@@ -165,7 +165,7 @@
       }
    }
 
-   private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
+   private Pair<PagePosition, PagedMessage> internalGetNext(final PagePosition pos)
    {
       PagePosition retPos = pos.nextMessage();
 
@@ -308,6 +308,7 @@
             Page page = pagingStore.depage();
             if (page != null)
             {
+               System.out.println("Deleting " + page);
                page.delete();
             }
          }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-20 21:09:02 UTC (rev 9803)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-20 22:45:48 UTC (rev 9804)
@@ -583,9 +583,80 @@
       assertTrue(cursorProvider.getCacheSize() < numberOfPages);
       
       server.stop();
+      
+      server.start();
+      
+      Thread.sleep(1000);
+      assertEquals(2, lookupPageStore(ADDRESS).getNumberOfPages());
+      
+      
 
    }
    
+   
+   public void testFirstMessageInTheMiddlePersistent() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+      
+      PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+      PageCursor cursor = cursorProvider.getPersistentCursor(queue.getID());
+      PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+      cursor.bookmark(startingPos);
+      PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+      msg.initMessage(server.getStorageManager());
+      int initialKey = msg.getMessage().getIntProperty("key").intValue();
+      int key = initialKey;
+      
+      msg = null;
+      
+      cache = null;
+      
+      Pair<PagePosition, PagedMessage> msgCursor = null;
+      while ((msgCursor = cursor.moveNext()) != null)
+      {
+         assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+      }
+      assertEquals(NUM_MESSAGES, key);
+      
+      
+      server.stop();
+      
+      OperationContextImpl.clearContext();
+      
+      server.start();
+      
+      cursorProvider = lookupCursorProvider();
+      cursor = cursorProvider.getPersistentCursor(queue.getID());
+      key = initialKey;
+      while ((msgCursor = cursor.moveNext()) != null)
+      {
+         assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msgCursor.a);
+      }
+      
+      
+      forceGC();
+      
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+      
+      // This is to make sure all the pending files will be deleted
+      server.stop();
+      
+      server.start();
+      
+      // TODO: this should be exact 2
+      assertTrue(lookupPageStore(ADDRESS).getNumberOfPages() <= 3);
+
+   }
+   
    private int addMessages(final int numMessages, final int messageSize) throws Exception
    {
       return addMessages(0, numMessages, messageSize);



More information about the hornetq-commits mailing list