[hornetq-commits] JBoss hornetq SVN: r9755 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 5 18:39:35 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-05 18:39:35 -0400 (Tue, 05 Oct 2010)
New Revision: 9755

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
adding tx test around redelivery of the cursor

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-05 22:39:35 UTC (rev 9755)
@@ -129,6 +129,11 @@
    {
       store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
       installTXCallback(tx, position);
+
+      // It needs to persist, otherwise the cursor will return to the fist page position
+      tx.setContainsPersistent();
+      
+      
       // tx.afterCommit()
    }
 
@@ -179,6 +184,7 @@
                   {
                      Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
                      // end of the hole, we can finish processing here
+                     // It may be also that the next was just a next page, so we just ignore it
                      if (msgCheck == null || msgCheck.a.equals(pos))
                      {
                         break;
@@ -240,6 +246,7 @@
     */
    private void installTXCallback(Transaction tx, PagePosition position)
    {
+      //TODO: Play with rollbacks on the reference counts
    }
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-05 22:18:27 UTC (rev 9754)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-05 22:39:35 UTC (rev 9755)
@@ -27,11 +27,14 @@
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 
@@ -153,13 +156,19 @@
       PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
       
       System.out.println("Cursor: " + cursor);
-      for (int i = 0 ; i < 500 ; i++)
+      for (int i = 0 ; i < 1000 ; i++)
       {
          Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
          assertEquals(i, msg.b.getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         
+         if (i < 500)
+         {
+            cursor.ack(msg.a);
+         }
       }
       
+      OperationContextImpl.getContext(null).waitCompletion();
+      
       server.stop();
       
       server.start();
@@ -173,6 +182,8 @@
          cursor.ack(msg.a);
       }
       
+      
+      
    }
    
    
@@ -225,11 +236,67 @@
    }
    
    
+   public void testRestartWithHoleOnAckAndTransaction() throws Exception
+   {
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+      
+      System.out.println("Number of pages = " + numberOfPages);
+      
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+      
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      System.out.println("Cursor: " + cursor);
+      
+      Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+      for (int i = 0 ; i < 100 ; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         if (i < 10 || i > 20)
+         {
+            cursor.ackTx(tx, msg.a);
+         }
+      }
+      
+      tx.commit();
+      
+      server.stop();
+      
+      server.start();
+      
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      for (int i = 10; i <= 20; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+    
+      for (int i = 100; i < NUM_MESSAGES; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+      
+   }
+   
+   
    public void testRollbackScenarios() throws Exception
    {
       
    }
    
+   public void testPrepareScenarios() throws Exception
+   {
+      
+   }
+   
    public void testRedeliveryScenarios() throws Exception
    {
       
@@ -297,7 +364,7 @@
       
       Configuration config = createDefaultConfig();
       
-      config.setJournalSyncNonTransactional(false);
+      config.setJournalSyncNonTransactional(true);
 
       server = createServer(true,
                             config,



More information about the hornetq-commits mailing list