Author: clebert.suconic(a)jboss.com
Date: 2010-10-20 16:50:34 -0400 (Wed, 20 Oct 2010)
New Revision: 9802
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Adding bookmarking for starting a cursor on a later position
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-20
14:00:18 UTC (rev 9801)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-20
20:50:34 UTC (rev 9802)
@@ -32,6 +32,8 @@
// To be called before the server is down
void stop();
+ void bookmark(PagePosition position) throws Exception;
+
/** It will be 0 if non persistent cursor */
public long getId();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-20
14:00:18 UTC (rev 9801)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-20
20:50:34 UTC (rev 9802)
@@ -74,6 +74,8 @@
// to be used on tests -------------------------------------------
int getCacheSize();
+
+ void printDebug();
// Package protected ---------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20
14:00:18 UTC (rev 9801)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20
20:50:34 UTC (rev 9802)
@@ -39,7 +39,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -110,9 +109,28 @@
public PageCursorProvider getProvider()
{
- return this.cursorProvider;
+ return cursorProvider;
}
+ public void bookmark(PagePosition position) throws Exception
+ {
+ if (lastPosition != null)
+ {
+ throw new RuntimeException("Bookmark can only be done at the time of the
cursor's creation");
+ }
+
+ lastPosition = position;
+
+ PageCursorInfo cursorInfo = getPageInfo(position);
+
+ if (position.getMessageNr() > 0)
+ {
+ cursorInfo.confirmed.addAndGet(position.getMessageNr() - 1);
+ }
+
+ ack(position);
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
@@ -195,7 +213,6 @@
installTXCallback(tx, position);
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
@@ -212,7 +229,6 @@
}
}
-
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -242,27 +258,24 @@
installTXCallback(tx, position);
}
-
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void positionIgnored(PagePosition position)
+ public void positionIgnored(final PagePosition position)
{
processACK(position);
}
-
-
+
/**
* All the data associated with the cursor should go away here
*/
- public void close() throws Exception
+ public void close() throws Exception
{
final long tx = store.generateUniqueID();
-
+
final ArrayList<Exception> ex = new ArrayList<Exception>();
-
+
final AtomicBoolean isPersistent = new AtomicBoolean(false);
-
// We can't delete the records at the caller's thread
// because an executor may be holding the synchronized on PageCursorImpl
@@ -295,24 +308,24 @@
catch (Exception e)
{
ex.add(e);
- log.warn(e.getMessage(), e);
+ PageCursorImpl.log.warn(e.getMessage(), e);
}
}
});
-
+
Future future = new Future();
-
+
executor.execute(future);
-
+
while (!future.await(5000))
{
- log.warn("Timeout on waiting cursor " + this + " to be
closed");
+ PageCursorImpl.log.warn("Timeout on waiting cursor " + this + "
to be closed");
}
-
-
+
if (isPersistent.get())
{
- // Another reason to perform the commit at the main thread is because the
OperationContext may only send the result to the client when
+ // Another reason to perform the commit at the main thread is because the
OperationContext may only send the
+ // result to the client when
// the IO on commit is done
if (ex.size() == 0)
{
@@ -324,10 +337,9 @@
throw ex.get(0);
}
}
-
+
cursorProvider.close(this);
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getId()
@@ -337,8 +349,6 @@
return cursorId;
}
-
-
public void processReload() throws Exception
{
if (recoveredACK != null)
@@ -397,29 +407,29 @@
previousPos = pos;
}
- this.lastAckedPosition = lastPosition;
+ lastAckedPosition = lastPosition;
recoveredACK.clear();
recoveredACK = null;
}
}
-
+
public void stop()
{
Future future = new Future();
executor.execute(future);
while (!future.await(1000))
{
- log.warn("Waiting page cursor to finish executors - " + this);
+ PageCursorImpl.log.warn("Waiting page cursor to finish executors - " +
this);
}
}
public void printDebug()
{
- printDebug(this.toString());
+ printDebug(toString());
}
-
- public void printDebug(String msg)
+
+ public void printDebug(final String msg)
{
System.out.println("Debug information on PageCurorImpl- " + msg);
for (PageCursorInfo info : consumedPages.values())
@@ -469,7 +479,7 @@
// there's a different page being acked, we will do the check right away
scheduleCleanupCheck();
}
- this.lastAckedPosition = pos;
+ lastAckedPosition = pos;
}
PageCursorInfo info = getPageInfo(pos);
@@ -550,7 +560,7 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- trace("We can't clear page " + entry.getKey() + "
now since it's the current page");
+ PageCursorImpl.trace("We can't clear page " +
entry.getKey() + " now since it's the current page");
}
else
{
@@ -588,7 +598,7 @@
{
executor.execute(new Runnable()
{
-
+
public void run()
{
synchronized (PageCursorImpl.this)
@@ -601,12 +611,13 @@
}
if (consumedPages.remove(completePage.getPageId()) == null)
{
- log.warn("Couldn't remove page " +
completePage.getPageId() + " from consumed pages on cursor for address " +
pageStore.getAddress());
+ PageCursorImpl.log.warn("Couldn't remove page "
+ completePage.getPageId() +
+ " from consumed pages on cursor
for address " +
+ pageStore.getAddress());
}
- }
+ }
}
-
-
+
cursorProvider.scheduleCleanup();
}
});
@@ -638,11 +649,12 @@
// We're holding this object to avoid delete the pages before the IO is
complete,
// however we can't delete these records again
private boolean pendingDelete;
-
+
// We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
+ @Override
public String toString()
{
return "PageCursorInfo::PageID=" + pageId + " numberOfMessage =
" + numberOfMessages;
@@ -663,15 +675,15 @@
{
return getNumberOfMessages() == confirmed.get();
}
-
+
public boolean isPendingDelete()
{
return pendingDelete;
}
-
+
public void setPendingDelete()
{
- this.pendingDelete = true;
+ pendingDelete = true;
}
/**
@@ -699,9 +711,13 @@
pageId);
}
- if (getNumberOfMessages() == confirmed.incrementAndGet())
+ // Negative could mean a bookmark on the first element for the page (example
-1)
+ if (posACK.getMessageNr() >= 0)
{
- onPageDone(this);
+ if (getNumberOfMessages() == confirmed.incrementAndGet())
+ {
+ onPageDone(this);
+ }
}
}
@@ -729,7 +745,7 @@
}
- static class PageCursorTX implements TransactionOperation
+ static class PageCursorTX extends TransactionOperationAbstract
{
HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new
HashMap<PageCursorImpl, List<PagePosition>>();
@@ -747,29 +763,9 @@
}
/* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
- */
- public void beforePrepare(final Transaction tx) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
- */
- public void afterPrepare(final Transaction tx)
- {
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
- */
- public void beforeCommit(final Transaction tx) throws Exception
- {
- }
-
- /* (non-Javadoc)
* @see
org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
*/
+ @Override
public void afterCommit(final Transaction tx)
{
for (Entry<PageCursorImpl, List<PagePosition>> entry :
pendingPositions.entrySet())
@@ -786,19 +782,6 @@
}
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
- */
- public void beforeRollback(final Transaction tx) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
- */
- public void afterRollback(final Transaction tx)
- {
- }
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20
14:00:18 UTC (rev 9801)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20
20:50:34 UTC (rev 9802)
@@ -112,7 +112,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
+ PageCursorProviderImpl cursorProvider =
(PageCursorProviderImpl)createNonPersistentCursor();
PageCursor cursor = cursorProvider.createNonPersistentCursor();
@@ -134,6 +134,16 @@
}
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursor createNonPersistentCursor() throws Exception
+ {
+ return lookupCursorProvider().createNonPersistentCursor();
+ }
+
+
public void testReadNextPage() throws Exception
{
@@ -143,12 +153,22 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
+ PageCursorProvider cursorProvider = lookupCursorProvider();
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
assertNull(cache);
}
+
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursorProvider lookupCursorProvider() throws Exception
+ {
+ return lookupPageStore(ADDRESS).getCursorProvier();
+ }
public void testRestart() throws Exception
@@ -159,7 +179,7 @@
System.out.println("Number of pages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider =
(PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ PageCursorProvider cursorProvider = lookupCursorProvider();
PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
@@ -480,7 +500,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProvider cursorProvider = lookupPageStore(ADDRESS).getCursorProvier();
+ PageCursorProvider cursorProvider = lookupCursorProvider();
PageCursor cursor = cursorProvider.createNonPersistentCursor();
PageCursorImpl cursor2 =
(PageCursorImpl)cursorProvider.createNonPersistentCursor();
@@ -527,7 +547,43 @@
public void testFirstMessageInTheMiddle() throws Exception
{
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+ PageCursor cursor = cursorProvider.createNonPersistentCursor();
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ cursor.bookmark(startingPos);
+ PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+ msg.initMessage(server.getStorageManager());
+ int key = msg.getMessage().getIntProperty("key").intValue();
+
+ msg = null;
+
+ cache = null;
+
+ Pair<PagePosition, PagedMessage> msgCursor = null;
+ while ((msgCursor = cursor.moveNext()) != null)
+ {
+ assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor.a);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ server.stop();
+
}
private int addMessages(final int numMessages, final int messageSize) throws
Exception