[hornetq-commits] JBoss hornetq SVN: r9787 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 14 17:40:20 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-14 17:40:19 -0400 (Thu, 14 Oct 2010)
New Revision: 9787

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Improving cleanup 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-14 21:40:19 UTC (rev 9787)
@@ -29,6 +29,8 @@
 
    // Cursor query operations --------------------------------------
    
+   void stop();
+   
    Pair<PagePosition, ServerMessage> moveNext() throws Exception;
 
    void ack(PagePosition position) throws Exception;
@@ -55,4 +57,6 @@
     * @param position
     */
    void redeliver(PagePosition position);
+   
+   void printDebug();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
@@ -41,6 +41,7 @@
 import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
 
 /**
  * A PageCursorImpl
@@ -57,11 +58,11 @@
 
    // Attributes ----------------------------------------------------
 
-   private final boolean isTrace = true; //PageCursorImpl.log.isTraceEnabled();
+   private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
 
    private static void trace(final String message)
    {
-      //PageCursorImpl.log.info(message);
+      // PageCursorImpl.log.info(message);
       System.out.println(message);
    }
 
@@ -77,6 +78,8 @@
 
    private volatile PagePosition lastPosition;
 
+   private volatile PagePosition lastAckedPosition;
+
    private List<PagePosition> recoveredACK;
 
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
@@ -275,11 +278,29 @@
             previousPos = pos;
          }
 
+         this.lastAckedPosition = lastPosition;
+
          recoveredACK.clear();
          recoveredACK = null;
       }
    }
+   
+   public void stop()
+   {
+      Future future = new Future();
+      executor.execute(future);
+      future.await(1000);
+   }
 
+   public void printDebug()
+   {
+      System.out.println("Debug information on PageCurorImpl- " + this);
+      for (PageCursorInfo info : consumedPages.values())
+      {
+         System.out.println(info);
+      }
+   }
+
    /**
     * @param page
     * @return
@@ -315,7 +336,10 @@
    // The only exception is on non storage events such as not matching messages
    private void processACK(final PagePosition pos)
    {
-      System.out.println("Processing ack for " + pos);
+      if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
+      {
+         this.lastAckedPosition = pos;
+      }
       PageCursorInfo info = getPageInfo(pos);
 
       info.addACK(pos);
@@ -387,7 +411,14 @@
          {
             if (entry.getValue().isDone())
             {
-               completedPages.add(entry.getValue());
+               if (entry.getKey() == lastAckedPosition.getPageNr())
+               {
+                  System.out.println("We can't clear page " + entry.getKey() + " now since it's the current page");
+               }
+               else
+               {
+                  completedPages.add(entry.getValue());
+               }
             }
          }
       }
@@ -396,27 +427,8 @@
       {
          PageCursorInfo info = completedPages.get(i);
 
-         boolean firstLine = true;
          for (PagePosition pos : info.acks)
          {
-            if (firstLine)
-            {
-               firstLine = false;
-               // We only do this check at the first line
-               PageCache cache = pos.getPageCache();
-               // The live cache has a hard reference on the PagingStoreImpl,
-               // So we are sure the reference would be filled on the PagePosition
-               if (cache != null && cache.isLive())
-               {
-                  completedPages.remove(i);
-                  break;
-               }
-               if (isTrace)
-               {
-                  PageCursorImpl.trace("Cleaning ACK records on page " + info.getPageId());
-               }
-            }
-
             if (pos.getRecordID() > 0)
             {
                store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
@@ -444,6 +456,7 @@
                   {
                      PageCursorImpl.trace("Removing page " + completePage.getPageId());
                   }
+                  System.out.println("Removing page " + completePage.getPageId());
                   consumedPages.remove(completePage.getPageId());
                }
             }
@@ -475,6 +488,11 @@
       // expressions
       private final AtomicInteger confirmed = new AtomicInteger(0);
 
+      public String toString()
+      {
+         return "PageCursorInfo::PaeID=" + pageId + " numberOfMessage = " + numberOfMessages;
+      }
+
       public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
       {
          this.pageId = pageId;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
@@ -192,6 +192,11 @@
 
    public void stop()
    {
+      for (PageCursor cursor : activeCursors.values())
+      {
+         cursor.stop();
+      }
+      
       activeCursors.clear();
    }
    

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-14 21:40:19 UTC (rev 9787)
@@ -25,6 +25,7 @@
 import org.hornetq.core.paging.cursor.PageCursor;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -153,45 +154,61 @@
       System.out.println("Number of pages = " + numberOfPages);
       
       PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      cursorProvider.printDebug();
       
       
       PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ 
+      PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
+
+      int firstPageSize = firstPage.getNumberOfMessages();
       
+      firstPage = null;
+      
       System.out.println("Cursor: " + cursor);
+      cursorProvider.printDebug();
+
       for (int i = 0 ; i < 1000 ; i++)
       {
          Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         cursorProvider.printDebug();
          assertNotNull(msg);
          assertEquals(i, msg.b.getIntProperty("key").intValue());
          
-         if (i < 500)
+         if (i < firstPageSize)
          {
             cursor.ack(msg.a);
          }
       }
+      cursorProvider.printDebug();
+     
+      // needs to clear the context since we are using the same thread over two distinct servers
+      // otherwise we will get the old executor on the factory
+      OperationContextImpl.clearContext();
       
-      OperationContextImpl.getContext(null).waitCompletion();
-      
       server.stop();
       
       server.start();
       
       cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
       
-      for (int i = 500; i < NUM_MESSAGES; i++)
+      for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
+         System.out.println("Received " + i);
          Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertNotNull(msg);
          assertEquals(i, msg.b.getIntProperty("key").intValue());
+         
          cursor.ack(msg.a);
+         
+         OperationContextImpl.getContext(null).waitCompletion();
+         
       }
+
+      OperationContextImpl.getContext(null).waitCompletion(); 
+      ((PageCursorImpl)cursor).printDebug();
       
       
-      
    }
    
-   
    public void testRestartWithHoleOnAck() throws Exception
    {
 
@@ -424,6 +441,7 @@
    protected void setUp() throws Exception
    {
       super.setUp();
+      OperationContextImpl.clearContext();
       System.out.println("Tmp:" + getTemporaryDir());
       
       Configuration config = createDefaultConfig();
@@ -445,6 +463,7 @@
 
    protected void tearDown() throws Exception
    {
+      OperationContextImpl.clearContext();
       server.stop();
       super.tearDown();
    }



More information about the hornetq-commits mailing list