Author: clebert.suconic(a)jboss.com
Date: 2010-11-04 23:46:12 -0400 (Thu, 04 Nov 2010)
New Revision: 9849
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
backup
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-05
02:18:58 UTC (rev 9848)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-05
03:46:12 UTC (rev 9849)
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
@@ -90,8 +91,6 @@
private final Executor executor;
- private volatile PagePosition lastPosition;
-
private volatile PagePosition lastAckedPosition;
private List<PagePosition> recoveredACK;
@@ -156,13 +155,6 @@
public void bookmark(PagePosition position) throws Exception
{
- if (lastPosition != null)
- {
- throw new RuntimeException("Bookmark can only be done at the time of the
cursor's creation");
- }
-
- lastPosition = position;
-
PageCursorInfo cursorInfo = getPageInfo(position);
if (position.getMessageNr() > 0)
@@ -175,7 +167,7 @@
class CursorIterator implements LinkedListIterator<PagedReference>
{
- PagePosition position = getLastPosition();
+ PagePosition position = null;
PagePosition lastOperation = null;
@@ -197,7 +189,7 @@
{
if (lastOperation == null)
{
- position = getLastPosition();
+ position = null;
}
else
{
@@ -218,6 +210,7 @@
cachedNext = null;
return retPos;
}
+
try
{
if (redeliveryIterator.hasNext())
@@ -229,6 +222,11 @@
{
isredelivery = false;
}
+
+ if (position == null)
+ {
+ position = getStartPosition();
+ }
PagedReferenceImpl nextPos = moveNext(position);
if (nextPos != null)
@@ -342,16 +340,38 @@
/**
*
*/
- private PagePosition getLastPosition()
+ private synchronized PagePosition getStartPosition()
{
- if (lastPosition == null)
+ // Get the first page not marked for deletion
+ // It's important to verify if it's not marked for deletion as you may have
a pending request on the queue
+ for (Map.Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
- return new PagePositionImpl(pageStore.getFirstPage(), -1);
+ if (!entry.getValue().isPendingDelete())
+ {
+ if (entry.getValue().acks.isEmpty())
+ {
+ return new PagePositionImpl(entry.getKey(), -1);
+ }
+ else
+ {
+ // The list is not ordered...
+ // This is only done at creation of the queue, so we just scan instead of
keeping the list ordened
+ PagePosition retValue = null;
+
+ for (PagePosition pos : entry.getValue().acks)
+ {
+ if (retValue == null || retValue.getMessageNr() <
pos.getMessageNr())
+ {
+ retValue = pos;
+ }
+ }
+
+ return retValue;
+ }
+ }
}
- else
- {
- return lastPosition;
- }
+
+ return new PagePositionImpl(pageStore.getFirstPage(), -1);
}
/* (non-Javadoc)
@@ -561,10 +581,10 @@
Collections.sort(recoveredACK);
boolean first = true;
-
- PagePosition previousPos = null;
+
for (PagePosition pos : recoveredACK)
{
+ lastAckedPosition = pos;
PageCursorInfo positions = getPageInfo(pos);
if (first)
{
@@ -576,50 +596,8 @@
}
positions.addACK(pos);
-
- lastPosition = pos;
- if (previousPos != null)
- {
- if (!previousPos.isRightAfter(previousPos))
- {
- PagePosition tmpPos = previousPos;
- // looking for holes on the ack list for redelivery
- while (true)
- {
- PagedReferenceImpl msgCheck = cursorProvider.getNext(this, tmpPos);
-
- positions = getPageInfo(tmpPos);
-
- // end of the hole, we can finish processing here
- // It may be also that the next was just a next page, so we just
ignore it
- if (msgCheck == null || msgCheck.getPosition().equals(pos))
- {
- break;
- }
- else
- {
- if (match(msgCheck.getMessage()))
- {
- redeliver(msgCheck.getPosition());
- }
- else
- {
- // The reference was ignored. But we must take a count from
the reference count
- // otherwise the page will never be deleted hence we would
never leave paging even if
- // everything was consumed
- positions.confirmed.incrementAndGet();
- }
- }
- tmpPos = msgCheck.getPosition();
- }
- }
- }
-
- previousPos = pos;
}
- lastAckedPosition = lastPosition;
-
recoveredACK.clear();
recoveredACK = null;
}
@@ -948,11 +926,8 @@
public void addACK(final PagePosition posACK)
{
- if (posACK.getRecordID() > 0)
- {
- // We store these elements for later cleanup
- acks.add(posACK);
- }
+ removedReferences.add(posACK);
+ acks.add(posACK);
if (isTrace)
{
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-05
02:18:58 UTC (rev 9848)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-05
03:46:12 UTC (rev 9849)
@@ -373,6 +373,8 @@
cursor.ack(msg);
}
}
+
+ server.getStorageManager().waitOnOperations();
server.stop();
@@ -841,6 +843,10 @@
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() /
2);
cursor.bookmark(startingPos);
+
+ // We can't proceed until the operation has finished
+ server.getStorageManager().waitOnOperations();
+
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
msg.initMessage(server.getStorageManager());
int initialKey = msg.getMessage().getIntProperty("key").intValue();