Author: clebert.suconic(a)jboss.com
Date: 2010-10-14 17:40:19 -0400 (Thu, 14 Oct 2010)
New Revision: 9787
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Improving cleanup
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-14
09:43:10 UTC (rev 9786)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-14
21:40:19 UTC (rev 9787)
@@ -29,6 +29,8 @@
// Cursor query operations --------------------------------------
+ void stop();
+
Pair<PagePosition, ServerMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
@@ -55,4 +57,6 @@
* @param position
*/
void redeliver(PagePosition position);
+
+ void printDebug();
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14
09:43:10 UTC (rev 9786)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14
21:40:19 UTC (rev 9787)
@@ -41,6 +41,7 @@
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
/**
* A PageCursorImpl
@@ -57,11 +58,11 @@
// Attributes ----------------------------------------------------
- private final boolean isTrace = true; //PageCursorImpl.log.isTraceEnabled();
+ private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
private static void trace(final String message)
{
- //PageCursorImpl.log.info(message);
+ // PageCursorImpl.log.info(message);
System.out.println(message);
}
@@ -77,6 +78,8 @@
private volatile PagePosition lastPosition;
+ private volatile PagePosition lastAckedPosition;
+
private List<PagePosition> recoveredACK;
private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
@@ -275,11 +278,29 @@
previousPos = pos;
}
+ this.lastAckedPosition = lastPosition;
+
recoveredACK.clear();
recoveredACK = null;
}
}
+
+ public void stop()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ future.await(1000);
+ }
+ public void printDebug()
+ {
+ System.out.println("Debug information on PageCurorImpl- " + this);
+ for (PageCursorInfo info : consumedPages.values())
+ {
+ System.out.println(info);
+ }
+ }
+
/**
* @param page
* @return
@@ -315,7 +336,10 @@
// The only exception is on non storage events such as not matching messages
private void processACK(final PagePosition pos)
{
- System.out.println("Processing ack for " + pos);
+ if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
+ {
+ this.lastAckedPosition = pos;
+ }
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
@@ -387,7 +411,14 @@
{
if (entry.getValue().isDone())
{
- completedPages.add(entry.getValue());
+ if (entry.getKey() == lastAckedPosition.getPageNr())
+ {
+ System.out.println("We can't clear page " +
entry.getKey() + " now since it's the current page");
+ }
+ else
+ {
+ completedPages.add(entry.getValue());
+ }
}
}
}
@@ -396,27 +427,8 @@
{
PageCursorInfo info = completedPages.get(i);
- boolean firstLine = true;
for (PagePosition pos : info.acks)
{
- if (firstLine)
- {
- firstLine = false;
- // We only do this check at the first line
- PageCache cache = pos.getPageCache();
- // The live cache has a hard reference on the PagingStoreImpl,
- // So we are sure the reference would be filled on the PagePosition
- if (cache != null && cache.isLive())
- {
- completedPages.remove(i);
- break;
- }
- if (isTrace)
- {
- PageCursorImpl.trace("Cleaning ACK records on page " +
info.getPageId());
- }
- }
-
if (pos.getRecordID() > 0)
{
store.deleteCursorAcknowledgeTransactional(tx.getID(),
pos.getRecordID());
@@ -444,6 +456,7 @@
{
PageCursorImpl.trace("Removing page " +
completePage.getPageId());
}
+ System.out.println("Removing page " +
completePage.getPageId());
consumedPages.remove(completePage.getPageId());
}
}
@@ -475,6 +488,11 @@
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
+ public String toString()
+ {
+ return "PageCursorInfo::PaeID=" + pageId + " numberOfMessage =
" + numberOfMessages;
+ }
+
public PageCursorInfo(final long pageId, final int numberOfMessages, final
PageCache cache)
{
this.pageId = pageId;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14
09:43:10 UTC (rev 9786)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14
21:40:19 UTC (rev 9787)
@@ -192,6 +192,11 @@
public void stop()
{
+ for (PageCursor cursor : activeCursors.values())
+ {
+ cursor.stop();
+ }
+
activeCursors.clear();
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14
09:43:10 UTC (rev 9786)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14
21:40:19 UTC (rev 9787)
@@ -25,6 +25,7 @@
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -153,45 +154,61 @@
System.out.println("Number of pages = " + numberOfPages);
PageCursorProviderImpl cursorProvider =
(PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- cursorProvider.printDebug();
PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
+
+ int firstPageSize = firstPage.getNumberOfMessages();
+ firstPage = null;
+
System.out.println("Cursor: " + cursor);
+ cursorProvider.printDebug();
+
for (int i = 0 ; i < 1000 ; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- cursorProvider.printDebug();
assertNotNull(msg);
assertEquals(i, msg.b.getIntProperty("key").intValue());
- if (i < 500)
+ if (i < firstPageSize)
{
cursor.ack(msg.a);
}
}
+ cursorProvider.printDebug();
+
+ // needs to clear the context since we are using the same thread over two distinct
servers
+ // otherwise we will get the old executor on the factory
+ OperationContextImpl.clearContext();
- OperationContextImpl.getContext(null).waitCompletion();
-
server.stop();
server.start();
cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
- for (int i = 500; i < NUM_MESSAGES; i++)
+ for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
+ System.out.println("Received " + i);
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertNotNull(msg);
assertEquals(i, msg.b.getIntProperty("key").intValue());
+
cursor.ack(msg.a);
+
+ OperationContextImpl.getContext(null).waitCompletion();
+
}
+
+ OperationContextImpl.getContext(null).waitCompletion();
+ ((PageCursorImpl)cursor).printDebug();
-
}
-
public void testRestartWithHoleOnAck() throws Exception
{
@@ -424,6 +441,7 @@
protected void setUp() throws Exception
{
super.setUp();
+ OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
Configuration config = createDefaultConfig();
@@ -445,6 +463,7 @@
protected void tearDown() throws Exception
{
+ OperationContextImpl.clearContext();
server.stop();
super.tearDown();
}