Author: clebert.suconic(a)jboss.com
Date: 2010-10-13 20:45:50 -0400 (Wed, 13 Oct 2010)
New Revision: 9785
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Implementing first step on cleanup after a whole page is consumed
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -38,6 +38,11 @@
private final Page page;
private boolean isLive = true;
+
+ public String toString()
+ {
+ return "LivePacheCacheImpl::page=" + page.getPageId() + " number of
messages=" + messages.size() + " isLive = " + isLive;
+ }
// Static --------------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -115,8 +115,13 @@
*/
public boolean isLive()
{
- return true;
+ return false;
}
+
+ public String toString()
+ {
+ return "PageCacheImpl::page=" + page.getPageId() + "
numberOfMessages = " + messages.length;
+ }
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor.impl;
+import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -56,6 +57,14 @@
// Attributes ----------------------------------------------------
+ private final boolean isTrace = true; //PageCursorImpl.log.isTraceEnabled();
+
+ private static void trace(final String message)
+ {
+ //PageCursorImpl.log.info(message);
+ System.out.println(message);
+ }
+
private final StorageManager store;
private final long cursorId;
@@ -190,6 +199,7 @@
*/
public void reloadACK(final PagePosition position)
{
+ System.out.println("reloading " + position);
if (recoveredACK == null)
{
recoveredACK = new LinkedList<PagePosition>();
@@ -211,7 +221,10 @@
{
if (recoveredACK != null)
{
- System.out.println("********** processing reload!!!!!!!");
+ if (isTrace)
+ {
+ PageCursorImpl.trace("********** processing reload!!!!!!!");
+ }
Collections.sort(recoveredACK);
PagePosition previousPos = null;
@@ -260,7 +273,6 @@
}
previousPos = pos;
- System.out.println("pos: " + pos);
}
recoveredACK.clear();
@@ -272,14 +284,15 @@
* @param page
* @return
*/
- private PageCursorInfo getPageInfo(final PagePosition pos)
+ private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
{
PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
if (pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
- pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
+ System.out.println("Number of Messages = " +
cache.getNumberOfMessages());
+ pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(),
cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
@@ -302,6 +315,7 @@
// The only exception is on non storage events such as not matching messages
private void processACK(final PagePosition pos)
{
+ System.out.println("Processing ack for " + pos);
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
@@ -338,8 +352,6 @@
*/
private void onPageDone(final PageCursorInfo info)
{
- System.out.println("Page " + info.getPageId() + " has
completed");
-
executor.execute(new Runnable()
{
@@ -351,7 +363,7 @@
}
catch (Exception e)
{
- PageCursorImpl.log.warn("Error on cleaning up cursor pages");
+ PageCursorImpl.log.warn("Error on cleaning up cursor pages",
e);
}
}
});
@@ -368,7 +380,7 @@
final ArrayList<PageCursorInfo> completedPages = new
ArrayList<PageCursorInfo>();
- // First get the completed pages using a lock
+ // First get the completed pages using a lock
synchronized (this)
{
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
@@ -380,10 +392,31 @@
}
}
- for (PageCursorInfo info : completedPages)
+ for (int i = 0; i < completedPages.size(); i++)
{
+ PageCursorInfo info = completedPages.get(i);
+
+ boolean firstLine = true;
for (PagePosition pos : info.acks)
{
+ if (firstLine)
+ {
+ firstLine = false;
+ // We only do this check at the first line
+ PageCache cache = pos.getPageCache();
+ // The live cache has a hard reference on the PagingStoreImpl,
+ // So we are sure the reference would be filled on the PagePosition
+ if (cache != null && cache.isLive())
+ {
+ completedPages.remove(i);
+ break;
+ }
+ if (isTrace)
+ {
+ PageCursorImpl.trace("Cleaning ACK records on page " +
info.getPageId());
+ }
+ }
+
if (pos.getRecordID() > 0)
{
store.deleteCursorAcknowledgeTransactional(tx.getID(),
pos.getRecordID());
@@ -407,7 +440,10 @@
{
for (PageCursorInfo completePage : completedPages)
{
- System.out.println("Removing page " +
completePage.getPageId());
+ if (isTrace)
+ {
+ PageCursorImpl.trace("Removing page " +
completePage.getPageId());
+ }
consumedPages.remove(completePage.getPageId());
}
}
@@ -430,19 +466,29 @@
// Confirmed ACKs on this page
private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
+ private WeakReference<PageCache> cache;
+
+ // The page was live at the time of the creation
+ private final boolean wasLive;
+
// We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
- public PageCursorInfo(final long pageId, final int numberOfMessages)
+ public PageCursorInfo(final long pageId, final int numberOfMessages, final
PageCache cache)
{
this.pageId = pageId;
this.numberOfMessages = numberOfMessages;
+ wasLive = cache.isLive();
+ if (wasLive)
+ {
+ this.cache = new WeakReference<PageCache>(cache);
+ }
}
public boolean isDone()
{
- return numberOfMessages == confirmed.get();
+ return getNumberOfMessages() == confirmed.get();
}
/**
@@ -461,12 +507,43 @@
acks.add(posACK);
}
- if (numberOfMessages == confirmed.incrementAndGet())
+ if (isTrace)
{
+ PageCursorImpl.trace("numberOfMessages = " + getNumberOfMessages()
+
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ ", page = " +
+ pageId);
+ }
+
+ if (getNumberOfMessages() == confirmed.incrementAndGet())
+ {
onPageDone(this);
}
}
+ private int getNumberOfMessages()
+ {
+ if (wasLive)
+ {
+ PageCache cache = this.cache.get();
+ if (cache != null)
+ {
+ return cache.getNumberOfMessages();
+ }
+ else
+ {
+ cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
+ this.cache = new WeakReference<PageCache>(cache);
+ return cache.getNumberOfMessages();
+ }
+ }
+ else
+ {
+ return numberOfMessages;
+ }
+ }
+
}
static class PageCursorTX implements TransactionOperation
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -194,6 +194,14 @@
{
activeCursors.clear();
}
+
+ public void printDebug()
+ {
+ for (PageCache cache: softCache.values())
+ {
+ System.out.println("Cache " + cache);
+ }
+ }
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -15,7 +15,6 @@
import java.lang.ref.WeakReference;
-import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PagePosition;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -559,13 +559,7 @@
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
Page page = new PageImpl(storeName, storageManager, fileFactory, file,
pageNumber);
-
- LivePageCache pageCache = new LivePageCacheImpl(page);
-
- page.setLiveCache(pageCache);
- cursorProvider.addPageCache(pageCache);
-
// To create the file
file.open();
@@ -1209,7 +1203,13 @@
}
currentPage = createPage(currentPageId);
+
+ LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+ currentPage.setLiveCache(pageCache);
+ cursorProvider.addPageCache(pageCache);
+
currentPageSize.set(0);
currentPage.open();
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-13
15:43:57 UTC (rev 9784)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14
00:45:50 UTC (rev 9785)
@@ -152,15 +152,18 @@
System.out.println("Number of pages = " + numberOfPages);
- PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
+ PageCursorProviderImpl cursorProvider =
(PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ cursorProvider.printDebug();
+
PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
System.out.println("Cursor: " + cursor);
for (int i = 0 ; i < 1000 ; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ cursorProvider.printDebug();
+ assertNotNull(msg);
assertEquals(i, msg.b.getIntProperty("key").intValue());
if (i < 500)
@@ -328,7 +331,7 @@
assertNotNull(readMessage);
- // TODO: ack on live data
+ cursor.ack(readMessage.a);
assertEquals(i, readMessage.b.getIntProperty("key").intValue());