[hornetq-commits] JBoss hornetq SVN: r9796 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 19 19:58:09 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-19 19:58:09 -0400 (Tue, 19 Oct 2010)
New Revision: 9796

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
Log:
Improving delete process

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-19 21:28:15 UTC (rev 9795)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-19 23:58:09 UTC (rev 9796)
@@ -416,7 +416,12 @@
 
    public void printDebug()
    {
-      System.out.println("Debug information on PageCurorImpl- " + this);
+      printDebug(this.toString());
+   }
+   
+   public void printDebug(String msg)
+   {
+      System.out.println("Debug information on PageCurorImpl- " + msg);
       for (PageCursorInfo info : consumedPages.values())
       {
          System.out.println(info);
@@ -540,7 +545,8 @@
       {
          for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
          {
-            if (entry.getValue().isDone())
+            PageCursorInfo info = entry.getValue();
+            if (info.isDone() && !info.isPendingDelete())
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {
@@ -548,6 +554,7 @@
                }
                else
                {
+                  info.setPendingDelete();
                   completedPages.add(entry.getValue());
                }
             }
@@ -592,11 +599,14 @@
                         {
                            PageCursorImpl.trace("Removing page " + completePage.getPageId());
                         }
-                        trace("Removing page " + completePage.getPageId());
-                        consumedPages.remove(completePage.getPageId());
-                     }
+                        if (consumedPages.remove(completePage.getPageId()) == null)
+                        {
+                           log.warn("Couldn't remove page " + completePage.getPageId() + " from consumed pages on cursor for address " + pageStore.getAddress());
+                        }
+                      }
                   }
                   
+                  
                   cursorProvider.scheduleCleanup();
                }
             });
@@ -624,13 +634,18 @@
       // The page was live at the time of the creation
       private final boolean wasLive;
 
+      // There's a pending delete on the async IO pipe
+      // We're holding this object to avoid delete the pages before the IO is complete,
+      // however we can't delete these records again
+      private boolean pendingDelete;
+      
       // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
       // expressions
       private final AtomicInteger confirmed = new AtomicInteger(0);
 
       public String toString()
       {
-         return "PageCursorInfo::PaeID=" + pageId + " numberOfMessage = " + numberOfMessages;
+         return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages;
       }
 
       public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
@@ -648,6 +663,16 @@
       {
          return getNumberOfMessages() == confirmed.get();
       }
+      
+      public boolean isPendingDelete()
+      {
+         return pendingDelete;
+      }
+      
+      public void setPendingDelete()
+      {
+         this.pendingDelete = true;
+      }
 
       /**
        * @return the pageId



More information about the hornetq-commits mailing list