[hornetq-commits] JBoss hornetq SVN: r9813 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 25 20:08:05 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-25 20:08:05 -0400 (Mon, 25 Oct 2010)
New Revision: 9813

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Clear depage logic

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-26 00:08:05 UTC (rev 9813)
@@ -93,4 +93,14 @@
    void addSize(int size);
    
    void executeRunnableWhenMemoryAvailable(Runnable runnable);
+   
+   /** This method will hold and producer, but it wait operations to finish before locking (write lock) */
+   void lock();
+   
+   /** 
+    * 
+    * Call this method using the same thread used by the last call of {@link PagingStore#lock()}
+    * 
+    */
+    void unlock();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-26 00:08:05 UTC (rev 9813)
@@ -79,4 +79,10 @@
    void redeliver(PagePosition position);
    
    void printDebug();
+
+   /**
+    * @param minPage
+    * @return
+    */
+   boolean isComplete(long minPage);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-26 00:08:05 UTC (rev 9813)
@@ -271,6 +271,15 @@
       processACK(position);
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
+    */
+   public boolean isComplete(long page)
+   {
+      PageCursorInfo info = consumedPages.get(page);
+      return info != null && info.isDone();
+   }
+
    /**
     * All the data associated with the cursor should go away here
     */

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-26 00:08:05 UTC (rev 9813)
@@ -14,7 +14,9 @@
 package org.hornetq.core.paging.cursor.impl;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
@@ -66,7 +68,7 @@
 
    private final Executor executor;
 
-   private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
+   private Map<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
 
    private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
 
@@ -85,7 +87,7 @@
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
       this.executor = executorFactory.getExecutor();
-    }
+   }
 
    // Public --------------------------------------------------------
 
@@ -93,16 +95,21 @@
    {
       return pagingStore;
    }
-   
-   public PageCursor createPersistentCursor(long cursorID, Filter filter)
+
+   public synchronized PageCursor createPersistentCursor(long cursorID, Filter filter)
    {
       PageCursor activeCursor = activeCursors.get(cursorID);
       if (activeCursor != null)
       {
          throw new IllegalStateException("Cursor " + cursorID + " had already been created");
       }
-      
-      activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, cursorID);
+
+      activeCursor = new PageCursorImpl(this,
+                                        pagingStore,
+                                        storageManager,
+                                        executorFactory.getExecutor(),
+                                        filter,
+                                        cursorID);
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
@@ -110,7 +117,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
     */
-   public PageCursor getPersistentCursor(long cursorID)
+   public synchronized PageCursor getPersistentCursor(long cursorID)
    {
       return activeCursors.get(cursorID);
    }
@@ -118,9 +125,14 @@
    /**
     * this will create a non-persistent cursor
     */
-   public PageCursor createNonPersistentCursor(Filter filter)
+   public synchronized PageCursor createNonPersistentCursor(Filter filter)
    {
-      PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, 0);
+      PageCursor cursor = new PageCursorImpl(this,
+                                             pagingStore,
+                                             storageManager,
+                                             executorFactory.getExecutor(),
+                                             filter,
+                                             0);
       nonPersistentCursors.add(cursor);
       return cursor;
    }
@@ -230,14 +242,20 @@
       return cache;
    }
 
-   public synchronized void addPageCache(PageCache cache)
+   public void addPageCache(PageCache cache)
    {
-      softCache.put(cache.getPageId(), cache);
+      synchronized (softCache)
+      {
+         softCache.put(cache.getPageId(), cache);
+      }
    }
 
-   public synchronized int getCacheSize()
+   public int getCacheSize()
    {
-      return softCache.size();
+      synchronized (softCache)
+      {
+         return softCache.size();
+      }
    }
 
    public void processReload() throws Exception
@@ -302,24 +320,70 @@
 
    private void cleanup()
    {
-      long minPage = getMinPageInUse();
+      ArrayList<Page> depagedPages = new ArrayList<Page>();
 
-      try
+      pagingStore.lock();
+
+      synchronized (this)
       {
-         for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+         try
          {
-            Page page = pagingStore.depage();
-            if (page != null)
+            ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+            cursorList.addAll(activeCursors.values());
+            cursorList.addAll(nonPersistentCursors);
+
+            long minPage = checkMinPage(cursorList);
+            
+            if (minPage == pagingStore.getCurrentWritingPage())
             {
-               System.out.println("Deleting " + page);
-               page.delete();
+               boolean complete = true;
+               
+               for (PageCursor cursor: cursorList)
+               {
+                  if (!cursor.isComplete(minPage))
+                  {
+                     complete = false;
+                     break;
+                  }
+               }
+               
+               if (complete)
+               {
+                  System.out.println("Depaging complete now. We can leave page state at this point!");
+                  // move every cursor away from the main page, clearing every cursor's old pages while only keeping a bookmark for the next page case it happens again
+               }
             }
+
+            for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+            {
+               Page page = pagingStore.depage();
+               depagedPages.add(page);
+            }
          }
+         catch (Exception ex)
+         {
+            log.warn("Couldn't complete cleanup on paging", ex);
+            return;
+         }
+         finally
+         {
+            pagingStore.unlock();
+         }
       }
+
+      try
+      {
+         for (Page depagedPage : depagedPages)
+         {
+            depagedPage.delete();
+         }
+      }
       catch (Exception ex)
       {
          log.warn("Couldn't complete cleanup on paging", ex);
+         return;
       }
+
    }
 
    public void printDebug()
@@ -343,15 +407,11 @@
 
    // Private -------------------------------------------------------
 
-   private long getMinPageInUse()
+   /**
+    * This method is synchronized because we want it to be atomic with the cursors being used
+    */
+   private long checkMinPage(List<PageCursor> cursorList)
    {
-      ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
-      synchronized (this)
-      {
-         cursorList.addAll(activeCursors.values());
-         cursorList.addAll(nonPersistentCursors);
-      }
-
       if (cursorList.size() == 0)
       {
          return 0l;
@@ -378,7 +438,7 @@
       {
          boolean needToRead = false;
          PageCache cache = null;
-         synchronized (this)
+         synchronized (softCache)
          {
             if (pageId > pagingStore.getCurrentWritingPage())
             {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-26 00:08:05 UTC (rev 9813)
@@ -217,6 +217,16 @@
 
    // PagingStore implementation ------------------------------------
    
+   public void lock()
+   {
+      writeLock.lock();
+   }
+   
+   public void unlock()
+   {
+      writeLock.unlock();
+   }
+   
    public PageCursorProvider getCursorProvier()
    {
       return cursorProvider;

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-26 00:08:05 UTC (rev 9813)
@@ -272,6 +272,7 @@
 
       for (int i = 0; i < 1000; i++)
       {
+         System.out.println("Reading Msg : " + i);
          Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
          assertNotNull(msg);
          assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());



More information about the hornetq-commits mailing list