[hornetq-commits] JBoss hornetq SVN: r9755 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 5 18:39:35 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-10-05 18:39:35 -0400 (Tue, 05 Oct 2010)
New Revision: 9755
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
adding tx test around redelivery of the cursor
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 22:39:35 UTC (rev 9755)
@@ -129,6 +129,11 @@
{
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
installTXCallback(tx, position);
+
+ // It needs to persist, otherwise the cursor will return to the fist page position
+ tx.setContainsPersistent();
+
+
// tx.afterCommit()
}
@@ -179,6 +184,7 @@
{
Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
// end of the hole, we can finish processing here
+ // It may be also that the next was just a next page, so we just ignore it
if (msgCheck == null || msgCheck.a.equals(pos))
{
break;
@@ -240,6 +246,7 @@
*/
private void installTXCallback(Transaction tx, PagePosition position)
{
+ //TODO: Play with rollbacks on the reference counts
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05 22:18:27 UTC (rev 9754)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05 22:39:35 UTC (rev 9755)
@@ -27,11 +27,14 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
@@ -153,13 +156,19 @@
PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
System.out.println("Cursor: " + cursor);
- for (int i = 0 ; i < 500 ; i++)
+ for (int i = 0 ; i < 1000 ; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getIntProperty("key").intValue());
- cursor.ack(msg.a);
+
+ if (i < 500)
+ {
+ cursor.ack(msg.a);
+ }
}
+ OperationContextImpl.getContext(null).waitCompletion();
+
server.stop();
server.start();
@@ -173,6 +182,8 @@
cursor.ack(msg.a);
}
+
+
}
@@ -225,11 +236,67 @@
}
+ public void testRestartWithHoleOnAckAndTransaction() throws Exception
+ {
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+ for (int i = 0 ; i < 100 ; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ if (i < 10 || i > 20)
+ {
+ cursor.ackTx(tx, msg.a);
+ }
+ }
+
+ tx.commit();
+
+ server.stop();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ for (int i = 10; i <= 20; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ for (int i = 100; i < NUM_MESSAGES; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ }
+
+
public void testRollbackScenarios() throws Exception
{
}
+ public void testPrepareScenarios() throws Exception
+ {
+
+ }
+
public void testRedeliveryScenarios() throws Exception
{
@@ -297,7 +364,7 @@
Configuration config = createDefaultConfig();
- config.setJournalSyncNonTransactional(false);
+ config.setJournalSyncNonTransactional(true);
server = createServer(true,
config,
More information about the hornetq-commits
mailing list