Author: clebert.suconic(a)jboss.com
Date: 2010-10-11 19:37:26 -0400 (Mon, 11 Oct 2010)
New Revision: 9771
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on cursors acks
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11
21:40:49 UTC (rev 9770)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11
23:37:26 UTC (rev 9771)
@@ -17,16 +17,15 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.Page;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -126,7 +125,7 @@
if (!match)
{
- confirmPagePosition(message.a);
+ processACK(message.a);
}
}
@@ -142,6 +141,18 @@
public void ack(final PagePosition position) throws Exception
{
store.storeCursorAcknowledge(cursorId, position);
+ store.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ processACK(position);
+ }
+ });
}
public void ackTx(final Transaction tx, final PagePosition position) throws Exception
@@ -173,7 +184,7 @@
*/
public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
- internalAdd(position);
+ // internalAdd(position);
installTXCallback(tx, position);
}
@@ -189,8 +200,7 @@
{
PageCursorInfo positions = getPageInfo(pos);
- positions.confirmed.incrementAndGet();
- positions.acks.add(pos);
+ positions.addACK(pos);
lastPosition = pos;
if (previousPos != null)
@@ -270,14 +280,13 @@
// Private -------------------------------------------------------
- private void confirmPagePosition(final PagePosition pos)
+ // To be called only after the ACK has been processed and guaranteed to be on storae
+ // The only exception is on non storage events such as not matching messages
+ private void processACK(final PagePosition pos)
{
PageCursorInfo info = getPageInfo(pos);
- if (info.confirmed.incrementAndGet() == info.getNumberOfMessages())
- {
- // todo delete previous destinations
- }
+ info.addACK(pos);
}
/**
@@ -311,13 +320,21 @@
tx.addOperation(cursorTX);
}
+ cursorTX.addPositionConfirmation(this, position);
+
}
+
+ // A callback from the PageCursorInfo. It will be called when all the messages on a
page have been acked
+ private void onPageDone(PageCursorInfo info)
+ {
+ System.out.println("Page " + info.getPageId() + " has
completed");
+ }
// Inner classes -------------------------------------------------
- private static class PageCursorInfo
+ private class PageCursorInfo
{
// Number of messages existent on this page
private final int numberOfMessages;
@@ -325,7 +342,7 @@
private final long pageId;
// Confirmed ACKs on this page
- private final List<PagePosition> acks = new
LinkedList<PagePosition>();
+ private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
// We need a separate counter as the cursor may be ignoring certain values because
of incomplete transactions or expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
@@ -343,6 +360,11 @@
{
return numberOfMessages;
}
+
+ public boolean isDone()
+ {
+ return numberOfMessages == confirmed.get();
+ }
/**
* @return the pageId
@@ -354,7 +376,16 @@
public void addACK(final PagePosition posACK)
{
- this.acks.add(posACK);
+ if (posACK.getRecordID() > 0)
+ {
+ // We store these elements for later cleanup
+ this.acks.add(posACK);
+ }
+
+ if (numberOfMessages == confirmed.incrementAndGet())
+ {
+ PageCursorImpl.this.onPageDone(this);
+ }
}
}
@@ -410,7 +441,7 @@
for (PagePosition confirmed : positions)
{
- cursor.confirmPagePosition(confirmed);
+ cursor.processACK(confirmed);
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11
21:40:49 UTC (rev 9770)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11
23:37:26 UTC (rev 9771)
@@ -115,6 +115,7 @@
while ((msg = cursor.moveNext()) != null)
{
assertEquals(key++, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
@@ -270,21 +271,25 @@
server.start();
cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
for (int i = 10; i <= 20; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ cursor.ackTx(tx,msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ cursor.ackTx(tx,msg.a);
}
+ tx.commit();
+
}
Show replies by date