[hornetq-commits] JBoss hornetq SVN: r9785 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 13 20:45:50 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-13 20:45:50 -0400 (Wed, 13 Oct 2010)
New Revision: 9785

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Implementing first step on cleanup after a whole page is consumed

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -38,6 +38,11 @@
    private final Page page;
    
    private boolean isLive = true;
+   
+   public String toString()
+   {
+      return "LivePacheCacheImpl::page=" + page.getPageId() + " number of messages=" + messages.size() + " isLive = " + isLive;
+   }
 
    // Static --------------------------------------------------------
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -115,8 +115,13 @@
     */
    public boolean isLive()
    {
-      return true;
+      return false;
    }
+   
+   public String toString()
+   {
+      return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
+   }
 
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.paging.cursor.impl;
 
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +57,14 @@
 
    // Attributes ----------------------------------------------------
 
+   private final boolean isTrace = true; //PageCursorImpl.log.isTraceEnabled();
+
+   private static void trace(final String message)
+   {
+      //PageCursorImpl.log.info(message);
+      System.out.println(message);
+   }
+
    private final StorageManager store;
 
    private final long cursorId;
@@ -190,6 +199,7 @@
     */
    public void reloadACK(final PagePosition position)
    {
+      System.out.println("reloading " + position);
       if (recoveredACK == null)
       {
          recoveredACK = new LinkedList<PagePosition>();
@@ -211,7 +221,10 @@
    {
       if (recoveredACK != null)
       {
-         System.out.println("********** processing reload!!!!!!!");
+         if (isTrace)
+         {
+            PageCursorImpl.trace("********** processing reload!!!!!!!");
+         }
          Collections.sort(recoveredACK);
 
          PagePosition previousPos = null;
@@ -260,7 +273,6 @@
             }
 
             previousPos = pos;
-            System.out.println("pos: " + pos);
          }
 
          recoveredACK.clear();
@@ -272,14 +284,15 @@
     * @param page
     * @return
     */
-   private PageCursorInfo getPageInfo(final PagePosition pos)
+   private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
    {
       PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
 
       if (pageInfo == null)
       {
          PageCache cache = cursorProvider.getPageCache(pos);
-         pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
+         System.out.println("Number of Messages = " + cache.getNumberOfMessages());
+         pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
          consumedPages.put(pos.getPageNr(), pageInfo);
       }
 
@@ -302,6 +315,7 @@
    // The only exception is on non storage events such as not matching messages
    private void processACK(final PagePosition pos)
    {
+      System.out.println("Processing ack for " + pos);
       PageCursorInfo info = getPageInfo(pos);
 
       info.addACK(pos);
@@ -338,8 +352,6 @@
     */
    private void onPageDone(final PageCursorInfo info)
    {
-      System.out.println("Page " + info.getPageId() + " has completed");
-
       executor.execute(new Runnable()
       {
 
@@ -351,7 +363,7 @@
             }
             catch (Exception e)
             {
-               PageCursorImpl.log.warn("Error on cleaning up cursor pages");
+               PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
             }
          }
       });
@@ -368,7 +380,7 @@
 
       final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
 
-      // First get the completed pages using a lock   
+      // First get the completed pages using a lock
       synchronized (this)
       {
          for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
@@ -380,10 +392,31 @@
          }
       }
 
-      for (PageCursorInfo info : completedPages)
+      for (int i = 0; i < completedPages.size(); i++)
       {
+         PageCursorInfo info = completedPages.get(i);
+
+         boolean firstLine = true;
          for (PagePosition pos : info.acks)
          {
+            if (firstLine)
+            {
+               firstLine = false;
+               // We only do this check at the first line
+               PageCache cache = pos.getPageCache();
+               // The live cache has a hard reference on the PagingStoreImpl,
+               // So we are sure the reference would be filled on the PagePosition
+               if (cache != null && cache.isLive())
+               {
+                  completedPages.remove(i);
+                  break;
+               }
+               if (isTrace)
+               {
+                  PageCursorImpl.trace("Cleaning ACK records on page " + info.getPageId());
+               }
+            }
+
             if (pos.getRecordID() > 0)
             {
                store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
@@ -407,7 +440,10 @@
             {
                for (PageCursorInfo completePage : completedPages)
                {
-                  System.out.println("Removing page " + completePage.getPageId());
+                  if (isTrace)
+                  {
+                     PageCursorImpl.trace("Removing page " + completePage.getPageId());
+                  }
                   consumedPages.remove(completePage.getPageId());
                }
             }
@@ -430,19 +466,29 @@
       // Confirmed ACKs on this page
       private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
 
+      private WeakReference<PageCache> cache;
+
+      // The page was live at the time of the creation
+      private final boolean wasLive;
+
       // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
       // expressions
       private final AtomicInteger confirmed = new AtomicInteger(0);
 
-      public PageCursorInfo(final long pageId, final int numberOfMessages)
+      public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
       {
          this.pageId = pageId;
          this.numberOfMessages = numberOfMessages;
+         wasLive = cache.isLive();
+         if (wasLive)
+         {
+            this.cache = new WeakReference<PageCache>(cache);
+         }
       }
 
       public boolean isDone()
       {
-         return numberOfMessages == confirmed.get();
+         return getNumberOfMessages() == confirmed.get();
       }
 
       /**
@@ -461,12 +507,43 @@
             acks.add(posACK);
          }
 
-         if (numberOfMessages == confirmed.incrementAndGet())
+         if (isTrace)
          {
+            PageCursorImpl.trace("numberOfMessages =  " + getNumberOfMessages() +
+                                 " confirmed =  " +
+                                 (confirmed.get() + 1) +
+                                 ", page = " +
+                                 pageId);
+         }
+
+         if (getNumberOfMessages() == confirmed.incrementAndGet())
+         {
             onPageDone(this);
          }
       }
 
+      private int getNumberOfMessages()
+      {
+         if (wasLive)
+         {
+            PageCache cache = this.cache.get();
+            if (cache != null)
+            {
+               return cache.getNumberOfMessages();
+            }
+            else
+            {
+               cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
+               this.cache = new WeakReference<PageCache>(cache);
+               return cache.getNumberOfMessages();
+            }
+         }
+         else
+         {
+            return numberOfMessages;
+         }
+      }
+
    }
 
    static class PageCursorTX implements TransactionOperation

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -194,6 +194,14 @@
    {
       activeCursors.clear();
    }
+   
+   public void printDebug()
+   {
+      for (PageCache cache: softCache.values())
+      {
+         System.out.println("Cache " + cache);
+      }
+   }
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -15,7 +15,6 @@
 
 import java.lang.ref.WeakReference;
 
-import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PagePosition;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -559,13 +559,7 @@
       SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
       
       Page page = new PageImpl(storeName, storageManager, fileFactory, file, pageNumber);
-      
-      LivePageCache pageCache = new LivePageCacheImpl(page);
-      
-      page.setLiveCache(pageCache);
 
-      cursorProvider.addPageCache(pageCache);
-      
       // To create the file 
       file.open();
 
@@ -1209,7 +1203,13 @@
          }
 
          currentPage = createPage(currentPageId);
+         
+         LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+         
+         currentPage.setLiveCache(pageCache);
 
+         cursorProvider.addPageCache(pageCache);
+         
          currentPageSize.set(0);
 
          currentPage.open();

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-14 00:45:50 UTC (rev 9785)
@@ -152,15 +152,18 @@
       
       System.out.println("Number of pages = " + numberOfPages);
       
-      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
-      System.out.println("cursorProvider = " + cursorProvider);
+      PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      cursorProvider.printDebug();
       
+      
       PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
       
       System.out.println("Cursor: " + cursor);
       for (int i = 0 ; i < 1000 ; i++)
       {
          Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         cursorProvider.printDebug();
+         assertNotNull(msg);
          assertEquals(i, msg.b.getIntProperty("key").intValue());
          
          if (i < 500)
@@ -328,7 +331,7 @@
          
          assertNotNull(readMessage);
          
-         // TODO: ack on live data
+         cursor.ack(readMessage.a);
          
          assertEquals(i, readMessage.b.getIntProperty("key").intValue());
          



More information about the hornetq-commits mailing list