Author: clebert.suconic(a)jboss.com
Date: 2010-11-01 23:33:07 -0400 (Mon, 01 Nov 2010)
New Revision: 9832
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
.remove on iterators
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
02:18:28 UTC (rev 9831)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02
03:33:07 UTC (rev 9832)
@@ -17,10 +17,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -44,6 +46,7 @@
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.Future;
import org.hornetq.utils.LinkedListImpl;
import org.hornetq.utils.LinkedListIterator;
@@ -243,6 +246,7 @@
*/
public void remove()
{
+ PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
/* (non-Javadoc)
@@ -265,6 +269,8 @@
{
return new CursorIterator();
}
+
+ int validCount = 0;
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
@@ -280,9 +286,22 @@
do
{
message = cursorProvider.getNext(this, tmpPosition);
-
- if (message != null)
+
+ boolean valid = true;
+ if (message == null)
{
+ valid = false;
+ }
+ else
+ {
+ PageCursorInfo info = getPageInfo(message.a, false);
+ if (info != null && info.isRemoved(message.a))
+ {
+ valid = false;
+ }
+ }
+ if (valid)
+ {
tmpPosition = message.a;
match = match(message.b.getMessage());
@@ -603,16 +622,21 @@
System.out.println(info);
}
}
+
+ private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
+ {
+ return getPageInfo(pos, true);
+ }
/**
* @param page
* @return
*/
- private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
+ private synchronized PageCursorInfo getPageInfo(final PagePosition pos, boolean
create)
{
PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
- if (pageInfo == null)
+ if (create && pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(),
cache);
@@ -822,6 +846,8 @@
private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
private WeakReference<PageCache> cache;
+
+ private Set<PagePosition> removedReferences = new
ConcurrentHashSet<PagePosition>();
// The page was live at the time of the creation
private final boolean wasLive;
@@ -878,6 +904,17 @@
{
return pageId;
}
+
+ public boolean isRemoved(final PagePosition pos)
+ {
+ return false;
+ //return removedReferences.contains(pos);
+ }
+
+ public void remove(final PagePosition position)
+ {
+ removedReferences.add(position);
+ }
public void addACK(final PagePosition posACK)
{
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
02:18:28 UTC (rev 9831)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02
03:33:07 UTC (rev 9832)
@@ -757,8 +757,8 @@
PageCursorProvider cursorProvider = lookupCursorProvider();
- PageSubscription cursor = cursorProvider.createSubscription(1, null, false);
- PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createSubscription(2, null, false);
+ PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
+ PageSubscriptionImpl cursor2 =
(PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
queue.getPageSubscription().close();
@@ -776,8 +776,6 @@
forceGC();
- //assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
for (int i = 0; i < 10; i++)
{
msg = iterator2.next();
@@ -788,6 +786,7 @@
cursor2.close();
+
lookupPageStore(ADDRESS).flushExecutors();
server.stop();