[hornetq-commits] JBoss hornetq SVN: r9849 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 4 23:46:12 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-04 23:46:12 -0400 (Thu, 04 Nov 2010)
New Revision: 9849

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
backup

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-05 02:18:58 UTC (rev 9848)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-05 03:46:12 UTC (rev 9849)
@@ -19,6 +19,7 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
@@ -90,8 +91,6 @@
 
    private final Executor executor;
 
-   private volatile PagePosition lastPosition;
-
    private volatile PagePosition lastAckedPosition;
 
    private List<PagePosition> recoveredACK;
@@ -156,13 +155,6 @@
 
    public void bookmark(PagePosition position) throws Exception
    {
-      if (lastPosition != null)
-      {
-         throw new RuntimeException("Bookmark can only be done at the time of the cursor's creation");
-      }
-
-      lastPosition = position;
-
       PageCursorInfo cursorInfo = getPageInfo(position);
 
       if (position.getMessageNr() > 0)
@@ -175,7 +167,7 @@
 
    class CursorIterator implements LinkedListIterator<PagedReference>
    {
-      PagePosition position = getLastPosition();
+      PagePosition position = null;
 
       PagePosition lastOperation = null;
 
@@ -197,7 +189,7 @@
          {
             if (lastOperation == null)
             {
-               position = getLastPosition();
+               position = null;
             }
             else
             {
@@ -218,6 +210,7 @@
             cachedNext = null;
             return retPos;
          }
+         
          try
          {
             if (redeliveryIterator.hasNext())
@@ -229,6 +222,11 @@
             {
                isredelivery = false;
             }
+            
+            if (position == null)
+            {
+               position = getStartPosition();
+            }
 
             PagedReferenceImpl nextPos = moveNext(position);
             if (nextPos != null)
@@ -342,16 +340,38 @@
    /**
     * 
     */
-   private PagePosition getLastPosition()
+   private synchronized PagePosition getStartPosition()
    {
-      if (lastPosition == null)
+      // Get the first page not marked for deletion
+      // It's important to verify if it's not marked for deletion as you may have a pending request on the queue
+      for (Map.Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
       {
-         return new PagePositionImpl(pageStore.getFirstPage(), -1);
+         if (!entry.getValue().isPendingDelete())
+         {
+            if (entry.getValue().acks.isEmpty())
+            {
+               return new PagePositionImpl(entry.getKey(), -1);
+            }
+            else
+            {
+               // The list is not ordered...
+               // This is only done at creation of the queue, so we just scan instead of keeping the list ordened
+               PagePosition retValue = null;
+               
+               for (PagePosition pos : entry.getValue().acks)
+               {
+                  if (retValue == null || retValue.getMessageNr() < pos.getMessageNr())
+                  {
+                     retValue = pos;
+                  }
+               }
+               
+               return retValue;
+            }
+         }
       }
-      else
-      {
-         return lastPosition;
-      }
+
+      return new PagePositionImpl(pageStore.getFirstPage(), -1);
    }
 
    /* (non-Javadoc)
@@ -561,10 +581,10 @@
          Collections.sort(recoveredACK);
 
          boolean first = true;
-
-         PagePosition previousPos = null;
+         
          for (PagePosition pos : recoveredACK)
          {
+            lastAckedPosition = pos;
             PageCursorInfo positions = getPageInfo(pos);
             if (first)
             {
@@ -576,50 +596,8 @@
             }
 
             positions.addACK(pos);
-
-            lastPosition = pos;
-            if (previousPos != null)
-            {
-               if (!previousPos.isRightAfter(previousPos))
-               {
-                  PagePosition tmpPos = previousPos;
-                  // looking for holes on the ack list for redelivery
-                  while (true)
-                  {
-                     PagedReferenceImpl msgCheck = cursorProvider.getNext(this, tmpPos);
-
-                     positions = getPageInfo(tmpPos);
-
-                     // end of the hole, we can finish processing here
-                     // It may be also that the next was just a next page, so we just ignore it
-                     if (msgCheck == null || msgCheck.getPosition().equals(pos))
-                     {
-                        break;
-                     }
-                     else
-                     {
-                        if (match(msgCheck.getMessage()))
-                        {
-                           redeliver(msgCheck.getPosition());
-                        }
-                        else
-                        {
-                           // The reference was ignored. But we must take a count from the reference count
-                           // otherwise the page will never be deleted hence we would never leave paging even if
-                           // everything was consumed
-                           positions.confirmed.incrementAndGet();
-                        }
-                     }
-                     tmpPos = msgCheck.getPosition();
-                  }
-               }
-            }
-
-            previousPos = pos;
          }
 
-         lastAckedPosition = lastPosition;
-
          recoveredACK.clear();
          recoveredACK = null;
       }
@@ -948,11 +926,8 @@
 
       public void addACK(final PagePosition posACK)
       {
-         if (posACK.getRecordID() > 0)
-         {
-            // We store these elements for later cleanup
-            acks.add(posACK);
-         }
+         removedReferences.add(posACK);
+         acks.add(posACK);
 
          if (isTrace)
          {

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-05 02:18:58 UTC (rev 9848)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-05 03:46:12 UTC (rev 9849)
@@ -373,6 +373,8 @@
             cursor.ack(msg);
          }
       }
+      
+      server.getStorageManager().waitOnOperations();
 
       server.stop();
 
@@ -841,6 +843,10 @@
       PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
       cursor.bookmark(startingPos);
+
+      // We can't proceed until the operation has finished
+      server.getStorageManager().waitOnOperations();
+      
       PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
       msg.initMessage(server.getStorageManager());
       int initialKey = msg.getMessage().getIntProperty("key").intValue();



More information about the hornetq-commits mailing list