[hornetq-commits] JBoss hornetq SVN: r9832 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 1 23:33:08 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-01 23:33:07 -0400 (Mon, 01 Nov 2010)
New Revision: 9832

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
.remove on iterators

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 02:18:28 UTC (rev 9831)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-02 03:33:07 UTC (rev 9832)
@@ -17,10 +17,12 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -44,6 +46,7 @@
 import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.LinkedListImpl;
 import org.hornetq.utils.LinkedListIterator;
@@ -243,6 +246,7 @@
        */
       public void remove()
       {
+         PageSubscriptionImpl.this.getPageInfo(position).remove(position);
       }
 
       /* (non-Javadoc)
@@ -265,6 +269,8 @@
    {
       return new CursorIterator();
    }
+   
+   int validCount = 0;
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
@@ -280,9 +286,22 @@
       do
       {
          message = cursorProvider.getNext(this, tmpPosition);
-
-         if (message != null)
+         
+         boolean valid = true;
+         if (message == null)
          {
+            valid = false;
+         }
+         else
+         {
+            PageCursorInfo info = getPageInfo(message.a, false);
+            if (info != null && info.isRemoved(message.a))
+            {
+               valid = false;
+            }
+         }
+         if (valid)
+         {
             tmpPosition = message.a;
 
             match = match(message.b.getMessage());
@@ -603,16 +622,21 @@
          System.out.println(info);
       }
    }
+  
+   private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
+   {
+      return getPageInfo(pos, true);
+   }
 
    /**
     * @param page
     * @return
     */
-   private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
+   private synchronized PageCursorInfo getPageInfo(final PagePosition pos, boolean create)
    {
       PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
 
-      if (pageInfo == null)
+      if (create && pageInfo == null)
       {
          PageCache cache = cursorProvider.getPageCache(pos);
          pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
@@ -822,6 +846,8 @@
       private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
 
       private WeakReference<PageCache> cache;
+      
+      private Set<PagePosition> removedReferences = new ConcurrentHashSet<PagePosition>();
 
       // The page was live at the time of the creation
       private final boolean wasLive;
@@ -878,6 +904,17 @@
       {
          return pageId;
       }
+      
+      public boolean isRemoved(final PagePosition pos)
+      {
+         return false;
+         //return removedReferences.contains(pos);
+      }
+      
+      public void remove(final PagePosition position)
+      {
+         removedReferences.add(position);
+      }
 
       public void addACK(final PagePosition posACK)
       {

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 02:18:28 UTC (rev 9831)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-02 03:33:07 UTC (rev 9832)
@@ -757,8 +757,8 @@
 
       PageCursorProvider cursorProvider = lookupCursorProvider();
 
-      PageSubscription cursor = cursorProvider.createSubscription(1, null, false);
-      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(2, null, false);
+      PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
+      PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
       
       queue.getPageSubscription().close();
 
@@ -776,8 +776,6 @@
 
       forceGC();
 
-      //assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
       for (int i = 0; i < 10; i++)
       {
          msg = iterator2.next();
@@ -788,6 +786,7 @@
 
       cursor2.close();
       
+       
       lookupPageStore(ADDRESS).flushExecutors();
 
       server.stop();



More information about the hornetq-commits mailing list