Author: clebert.suconic(a)jboss.com
Date: 2010-10-29 20:06:04 -0400 (Fri, 29 Oct 2010)
New Revision: 9822
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Changes
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-29
23:28:22 UTC (rev 9821)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-30
00:06:04 UTC (rev 9822)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
/**
* A PageCursor
@@ -37,6 +38,8 @@
/** It will be 0 if non persistent cursor */
public long getId();
+ public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+
// To be called when the cursor is closed for good. Most likely when the queue is
deleted
void close() throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-29
23:28:22 UTC (rev 9821)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-30
00:06:04 UTC (rev 9822)
@@ -152,7 +152,7 @@
}
- class LocalIterator implements LinkedListIterator<Pair<PagePosition,
PagedMessage>>
+ class CursorIterator implements LinkedListIterator<Pair<PagePosition,
PagedMessage>>
{
PagePosition position = getLastPosition();
@@ -191,7 +191,14 @@
{
Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
lastOperation = position;
- position = nextPos.a;
+ if (nextPos == null)
+ {
+ position = null;
+ }
+ else
+ {
+ position = nextPos.a;
+ }
return nextPos;
}
catch (Exception e)
@@ -219,8 +226,20 @@
{
}
}
+
+
/* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
+ */
+ public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+ {
+ return new CursorIterator();
+ }
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition
posision) throws Exception
@@ -923,5 +942,4 @@
}
}
-
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-29
23:28:22 UTC (rev 9821)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-30
00:06:04 UTC (rev 9822)
@@ -46,6 +46,7 @@
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.LinkedListIterator;
/**
* A PageCacheTest
@@ -118,14 +119,24 @@
Pair<PagePosition, PagedMessage> msg;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
int key = 0;
- while ((msg = cursor.moveNext()) != null)
+ while ((msg = iterator.next()) != null)
{
assertEquals(key++,
msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
+
+ server.getStorageManager().waitOnOperations();
+
+ waitCleanup();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
forceGC();
assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
@@ -194,8 +205,13 @@
Pair<PagePosition, PagedMessage> msg;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven =
cursorEven.iterator();
+
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd =
cursorOdd.iterator();
+
+
int key = 0;
- while ((msg = cursorEven.moveNext()) != null)
+ while ((msg = iteratorEven.next()) != null)
{
assertEquals(key,
msg.b.getMessage().getIntProperty("key").intValue());
assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
@@ -205,7 +221,7 @@
assertEquals(NUM_MESSAGES, key);
key = 1;
- while ((msg = cursorOdd.moveNext()) != null)
+ while ((msg = iteratorOdd.next()) != null)
{
assertEquals(key,
msg.b.getMessage().getIntProperty("key").intValue());
assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
@@ -271,11 +287,13 @@
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
+
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
for (int i = 0; i < 1000; i++)
{
System.out.println("Reading Msg : " + i);
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
@@ -305,10 +323,12 @@
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
+
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
@@ -356,9 +376,10 @@
.createPersistentCursor(queue.getID(), null);
System.out.println("Cursor: " + cursor);
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
@@ -376,17 +397,18 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
@@ -422,9 +444,12 @@
System.out.println("Cursor: " + cursor);
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
@@ -446,17 +471,18 @@
.getPersistentCursor(queue.getID());
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+ iterator = cursor.iterator();
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx, msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx, msg.a);
}
@@ -495,6 +521,8 @@
System.out.println("Cursor: " + cursor);
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
if (i % 100 == 0)
@@ -509,13 +537,13 @@
Assert.assertTrue(pageStore.page(msg));
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertNotNull(readMessage);
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
- assertNull(cursor.moveNext());
+ assertNull(iterator.next());
}
server.stop();
@@ -530,6 +558,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 2; i++)
{
@@ -549,7 +578,7 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertNotNull(readMessage);
@@ -568,6 +597,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 3; i++)
{
@@ -587,7 +617,7 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertNotNull(readMessage);
@@ -596,7 +626,7 @@
assertEquals(i,
readMessage.b.getMessage().getIntProperty("key").intValue());
}
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertEquals(NUM_MESSAGES * 3,
readMessage.b.getMessage().getIntProperty("key").intValue());
@@ -658,6 +688,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.createPersistentCursor(queue.getID(), null);
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -686,13 +717,13 @@
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
- Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> pos = iterator.next();
assertNotNull("Null at position " + i, pos);
assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
cursor.ack(pos.a);
}
- assertNull(cursor.moveNext());
+ assertNull(iterator.next());
cursor.printDebug();
pgtxRollback.rollback();
@@ -703,13 +734,13 @@
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
{
- Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> pos = iterator.next();
assertNotNull(pos);
assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
cursor.ack(pos.a);
}
- assertNull(cursor.moveNext());
+ assertNull(iterator.next());
server.stop();
createServer();
@@ -733,9 +764,11 @@
PageCursorImpl cursor2 =
(PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
Pair<PagePosition, PagedMessage> msg;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator2 =
cursor.iterator();
int key = 0;
- while ((msg = cursor.moveNext()) != null)
+ while ((msg = iterator.next()) != null)
{
assertEquals(key++,
msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
@@ -748,7 +781,7 @@
for (int i = 0; i < 10; i++)
{
- msg = cursor2.moveNext();
+ msg = iterator2.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
}
@@ -791,9 +824,10 @@
msg = null;
cache = null;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
Pair<PagePosition, PagedMessage> msgCursor = null;
- while ((msgCursor = cursor.moveNext()) != null)
+ while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msgCursor.a);
@@ -840,8 +874,10 @@
cache = null;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator =
cursor.iterator();
+
Pair<PagePosition, PagedMessage> msgCursor = null;
- while ((msgCursor = cursor.moveNext()) != null)
+ while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
}
@@ -856,7 +892,8 @@
cursorProvider = lookupCursorProvider();
cursor = cursorProvider.getPersistentCursor(queue.getID());
key = initialKey;
- while ((msgCursor = cursor.moveNext()) != null)
+ iterator = cursor.iterator();
+ while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++,
msgCursor.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msgCursor.a);