Author: clebert.suconic(a)jboss.com
Date: 2010-10-19 19:58:09 -0400 (Tue, 19 Oct 2010)
New Revision: 9796
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
Log:
Improving delete process
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-19
21:28:15 UTC (rev 9795)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-19
23:58:09 UTC (rev 9796)
@@ -416,7 +416,12 @@
public void printDebug()
{
- System.out.println("Debug information on PageCurorImpl- " + this);
+ printDebug(this.toString());
+ }
+
+ public void printDebug(String msg)
+ {
+ System.out.println("Debug information on PageCurorImpl- " + msg);
for (PageCursorInfo info : consumedPages.values())
{
System.out.println(info);
@@ -540,7 +545,8 @@
{
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
- if (entry.getValue().isDone())
+ PageCursorInfo info = entry.getValue();
+ if (info.isDone() && !info.isPendingDelete())
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
@@ -548,6 +554,7 @@
}
else
{
+ info.setPendingDelete();
completedPages.add(entry.getValue());
}
}
@@ -592,11 +599,14 @@
{
PageCursorImpl.trace("Removing page " +
completePage.getPageId());
}
- trace("Removing page " + completePage.getPageId());
- consumedPages.remove(completePage.getPageId());
- }
+ if (consumedPages.remove(completePage.getPageId()) == null)
+ {
+ log.warn("Couldn't remove page " +
completePage.getPageId() + " from consumed pages on cursor for address " +
pageStore.getAddress());
+ }
+ }
}
+
cursorProvider.scheduleCleanup();
}
});
@@ -624,13 +634,18 @@
// The page was live at the time of the creation
private final boolean wasLive;
+ // There's a pending delete on the async IO pipe
+ // We're holding this object to avoid delete the pages before the IO is
complete,
+ // however we can't delete these records again
+ private boolean pendingDelete;
+
// We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
public String toString()
{
- return "PageCursorInfo::PaeID=" + pageId + " numberOfMessage =
" + numberOfMessages;
+ return "PageCursorInfo::PageID=" + pageId + " numberOfMessage =
" + numberOfMessages;
}
public PageCursorInfo(final long pageId, final int numberOfMessages, final
PageCache cache)
@@ -648,6 +663,16 @@
{
return getNumberOfMessages() == confirmed.get();
}
+
+ public boolean isPendingDelete()
+ {
+ return pendingDelete;
+ }
+
+ public void setPendingDelete()
+ {
+ this.pendingDelete = true;
+ }
/**
* @return the pageId
Show replies by date