Author: clebert.suconic(a)jboss.com
Date: 2010-10-20 18:45:48 -0400 (Wed, 20 Oct 2010)
New Revision: 9804
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
more on bookmarking cursors
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20
21:09:02 UTC (rev 9803)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20
22:45:48 UTC (rev 9804)
@@ -358,11 +358,21 @@
PageCursorImpl.trace("********** processing reload!!!!!!!");
}
Collections.sort(recoveredACK);
+
+ boolean first = true;
PagePosition previousPos = null;
for (PagePosition pos : recoveredACK)
{
PageCursorInfo positions = getPageInfo(pos);
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ positions.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
positions.addACK(pos);
@@ -556,7 +566,7 @@
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
PageCursorInfo info = entry.getValue();
- if (info.isDone() && !info.isPendingDelete())
+ if (info.isDone() && !info.isPendingDelete() &&
lastAckedPosition != null)
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-20
21:09:02 UTC (rev 9803)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-20
22:45:48 UTC (rev 9804)
@@ -130,7 +130,7 @@
while (true)
{
- Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
+ Pair<PagePosition, PagedMessage> retPos = internalGetNext(cursorPos);
if (retPos == null)
{
@@ -165,7 +165,7 @@
}
}
- private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
+ private Pair<PagePosition, PagedMessage> internalGetNext(final PagePosition
pos)
{
PagePosition retPos = pos.nextMessage();
@@ -308,6 +308,7 @@
Page page = pagingStore.depage();
if (page != null)
{
+ System.out.println("Deleting " + page);
page.delete();
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20
21:09:02 UTC (rev 9803)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20
22:45:48 UTC (rev 9804)
@@ -583,9 +583,80 @@
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
server.stop();
+
+ server.start();
+
+ Thread.sleep(1000);
+ assertEquals(2, lookupPageStore(ADDRESS).getNumberOfPages());
+
+
}
+
+ public void testFirstMessageInTheMiddlePersistent() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+ PageCursor cursor = cursorProvider.getPersistentCursor(queue.getID());
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ cursor.bookmark(startingPos);
+ PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+ msg.initMessage(server.getStorageManager());
+ int initialKey = msg.getMessage().getIntProperty("key").intValue();
+ int key = initialKey;
+
+ msg = null;
+
+ cache = null;
+
+ Pair<PagePosition, PagedMessage> msgCursor = null;
+ while ((msgCursor = cursor.moveNext()) != null)
+ {
+ assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ server.start();
+
+ cursorProvider = lookupCursorProvider();
+ cursor = cursorProvider.getPersistentCursor(queue.getID());
+ key = initialKey;
+ while ((msgCursor = cursor.moveNext()) != null)
+ {
+ assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor.a);
+ }
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ // This is to make sure all the pending files will be deleted
+ server.stop();
+
+ server.start();
+
+ // TODO: this should be exact 2
+ assertTrue(lookupPageStore(ADDRESS).getNumberOfPages() <= 3);
+
+ }
+
private int addMessages(final int numMessages, final int messageSize) throws
Exception
{
return addMessages(0, numMessages, messageSize);
Show replies by date