[hornetq-commits] JBoss hornetq SVN: r9771 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 11 19:37:26 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-11 19:37:26 -0400 (Mon, 11 Oct 2010)
New Revision: 9771

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on cursors acks

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-11 21:40:49 UTC (rev 9770)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-11 23:37:26 UTC (rev 9771)
@@ -17,16 +17,15 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.Page;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
@@ -126,7 +125,7 @@
 
             if (!match)
             {
-               confirmPagePosition(message.a);
+               processACK(message.a);
             }
          }
          
@@ -142,6 +141,18 @@
    public void ack(final PagePosition position) throws Exception
    {
       store.storeCursorAcknowledge(cursorId, position);
+      store.afterCompleteOperations(new IOAsyncTask()
+      {
+         
+         public void onError(int errorCode, String errorMessage)
+         {
+         }
+         
+         public void done()
+         {
+            processACK(position);
+         }
+      });
    }
 
    public void ackTx(final Transaction tx, final PagePosition position) throws Exception
@@ -173,7 +184,7 @@
     */
    public void reloadPreparedACK(final Transaction tx, final PagePosition position)
    {
-      internalAdd(position);
+      // internalAdd(position);
       installTXCallback(tx, position);
    }
 
@@ -189,8 +200,7 @@
          {
             PageCursorInfo positions = getPageInfo(pos);
             
-            positions.confirmed.incrementAndGet();
-            positions.acks.add(pos);
+            positions.addACK(pos);
 
             lastPosition = pos;
             if (previousPos != null)
@@ -270,14 +280,13 @@
 
    // Private -------------------------------------------------------
     
-   private void confirmPagePosition(final PagePosition pos)
+   // To be called only after the ACK has been processed and guaranteed to be on storae
+   // The only exception is on non storage events such as not matching messages
+   private void processACK(final PagePosition pos)
    {
       PageCursorInfo info = getPageInfo(pos);
       
-      if (info.confirmed.incrementAndGet() == info.getNumberOfMessages())
-      {
-         // todo delete previous destinations
-      }
+      info.addACK(pos);
    }
 
    /**
@@ -311,13 +320,21 @@
          tx.addOperation(cursorTX);
       }
       
+      cursorTX.addPositionConfirmation(this, position);
       
+      
    }
+   
+   // A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
+   private void onPageDone(PageCursorInfo info)
+   {
+      System.out.println("Page " + info.getPageId() + " has completed");
+   }
 
    // Inner classes -------------------------------------------------
    
    
-   private static class PageCursorInfo
+   private class PageCursorInfo
    {
       // Number of messages existent on this page
       private final int numberOfMessages;
@@ -325,7 +342,7 @@
       private final long pageId;
       
       // Confirmed ACKs on this page
-      private final List<PagePosition> acks = new LinkedList<PagePosition>();
+      private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
       
       // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or expressions 
       private final AtomicInteger confirmed = new AtomicInteger(0);
@@ -343,6 +360,11 @@
       {
          return numberOfMessages;
       }
+      
+      public boolean isDone()
+      {
+         return numberOfMessages == confirmed.get();
+      }
 
       /**
        * @return the pageId
@@ -354,7 +376,16 @@
       
       public void addACK(final PagePosition posACK)
       {
-         this.acks.add(posACK);
+         if (posACK.getRecordID() > 0)
+         {
+            // We store these elements for later cleanup
+            this.acks.add(posACK);
+         }
+         
+         if (numberOfMessages == confirmed.incrementAndGet())
+         {
+            PageCursorImpl.this.onPageDone(this);
+         }
       }
       
     }
@@ -410,7 +441,7 @@
             
             for (PagePosition confirmed : positions)
             {
-               cursor.confirmPagePosition(confirmed);
+               cursor.processACK(confirmed);
             }
             
          }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-11 21:40:49 UTC (rev 9770)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-11 23:37:26 UTC (rev 9771)
@@ -115,6 +115,7 @@
       while ((msg = cursor.moveNext()) != null)
       {
          assertEquals(key++, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
       }
       assertEquals(NUM_MESSAGES, key);
       
@@ -270,21 +271,25 @@
       server.start();
       
       cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+      tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
       
       for (int i = 10; i <= 20; i++)
       {
          Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
          assertEquals(i, msg.b.getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         cursor.ackTx(tx,msg.a);
       }
     
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
          Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
          assertEquals(i, msg.b.getIntProperty("key").intValue());
-         cursor.ack(msg.a);
+         cursor.ackTx(tx,msg.a);
       }
       
+      tx.commit();
+      
    }
    
    



More information about the hornetq-commits mailing list