[hornetq-commits] JBoss hornetq SVN: r9817 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 27 17:50:48 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-27 17:50:48 -0400 (Wed, 27 Oct 2010)
New Revision: 9817

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
cleanup page support

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -83,13 +83,22 @@
    Page depage() throws Exception;
 
 
+   void forceAnotherPage() throws Exception;
 
+   Page getCurrentPage();
+
+
    /**
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 
     */
    boolean startDepaging();
+   
+   /** @return true if paging was started, or false if paging was already started before this call */
+   boolean startPaging() throws Exception;
 
+   void stopPaging() throws Exception;
+
    void addSize(int size);
    
    void executeRunnableWhenMemoryAvailable(Runnable runnable);
@@ -103,4 +112,8 @@
     * 
     */
     void unlock();
+
+    /** This is used mostly by tests.
+     *  We will wait any pending runnable to finish its execution */
+    void flushExecutors();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -40,6 +40,14 @@
    // To be called when the cursor is closed for good. Most likely when the queue is deleted
    void close() throws Exception;
    
+   void scheduleCleanupCheck();
+   
+   void cleanupEntries() throws Exception;
+   
+   void disableAutoCleanup();
+   
+   void enableAutoCleanup();
+   
    Pair<PagePosition, PagedMessage> moveNext() throws Exception;
 
    void ack(PagePosition position) throws Exception;
@@ -85,4 +93,6 @@
     * @return
     */
    boolean isComplete(long minPage);
+
+   void flushExecutors();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -66,8 +66,13 @@
    void processReload() throws Exception;
 
    void stop();
+   
+   void flushExecutors();
 
    void scheduleCleanup();
+   
+   // Perform the cleanup at the caller's thread (for startup and recovery)
+   void cleanup();
 
    /**
     * @param pageCursorImpl

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -68,10 +68,12 @@
       System.out.println(message);
    }
 
+   private volatile boolean autoCleanup = true;
+
    private final StorageManager store;
 
    private final long cursorId;
-   
+
    private final Filter filter;
 
    private final PagingStore pageStore;
@@ -112,6 +114,16 @@
 
    // Public --------------------------------------------------------
 
+   public void disableAutoCleanup()
+   {
+      autoCleanup = false;
+   }
+
+   public void enableAutoCleanup()
+   {
+      autoCleanup = true;
+   }
+
    public PageCursorProvider getProvider()
    {
       return cursorProvider;
@@ -438,7 +450,7 @@
       }
    }
 
-   public void stop()
+   public void flushExecutors()
    {
       Future future = new Future();
       executor.execute(future);
@@ -448,6 +460,11 @@
       }
    }
 
+   public void stop()
+   {
+      flushExecutors();
+   }
+
    public void printDebug()
    {
       printDebug(toString());
@@ -507,7 +524,10 @@
          if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
          {
             // there's a different page being acked, we will do the check right away
-            scheduleCleanupCheck();
+            if (autoCleanup)
+            {
+               scheduleCleanupCheck();
+            }
          }
          lastAckedPosition = pos;
       }
@@ -547,32 +567,38 @@
     */
    private void onPageDone(final PageCursorInfo info)
    {
-      scheduleCleanupCheck();
+      if (autoCleanup)
+      {
+         scheduleCleanupCheck();
+      }
    }
 
-   private void scheduleCleanupCheck()
+   public void scheduleCleanupCheck()
    {
-      executor.execute(new Runnable()
+      if (autoCleanup)
       {
+         executor.execute(new Runnable()
+         {
 
-         public void run()
-         {
-            try
+            public void run()
             {
-               cleanupPages();
+               try
+               {
+                  cleanupEntries();
+               }
+               catch (Exception e)
+               {
+                  PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
+               }
             }
-            catch (Exception e)
-            {
-               PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
-            }
-         }
-      });
+         });
+      }
    }
 
    /** 
     * It will cleanup all the records for completed pages
     * */
-   private void cleanupPages() throws Exception
+   public void cleanupEntries() throws Exception
    {
       Transaction tx = new TransactionImpl(store);
 
@@ -687,7 +713,11 @@
       @Override
       public String toString()
       {
-         return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages + ", confirmed = " + confirmed;
+         return "PageCursorInfo::PageID=" + pageId +
+                " numberOfMessage = " +
+                numberOfMessages +
+                ", confirmed = " +
+                confirmed;
       }
 
       public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -264,6 +264,9 @@
       {
          cursor.processReload();
       }
+
+      cleanup();
+
    }
 
    public void stop()
@@ -289,6 +292,29 @@
 
    }
 
+   public void flushExecutors()
+   {
+      for (PageCursor cursor : activeCursors.values())
+      {
+         cursor.flushExecutors();
+      }
+
+      for (PageCursor cursor : nonPersistentCursors)
+      {
+         cursor.flushExecutors();
+      }
+
+      Future future = new Future();
+
+      executor.execute(future);
+
+      while (!future.await(10000))
+      {
+         log.warn("Waiting cursor provider " + this + " to finish executors");
+      }
+
+   }
+
    public void close(PageCursor cursor)
    {
       if (cursor.getId() != 0)
@@ -318,7 +344,7 @@
       });
    }
 
-   private void cleanup()
+   public void cleanup()
    {
       ArrayList<Page> depagedPages = new ArrayList<Page>();
 
@@ -328,17 +354,22 @@
       {
          try
          {
+            if (!pagingStore.isStarted())
+            {
+               return;
+            }
+
             ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
             cursorList.addAll(activeCursors.values());
             cursorList.addAll(nonPersistentCursors);
 
             long minPage = checkMinPage(cursorList);
-            
-            if (minPage == pagingStore.getCurrentWritingPage())
+
+            if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
             {
                boolean complete = true;
-               
-               for (PageCursor cursor: cursorList)
+
+               for (PageCursor cursor : cursorList)
                {
                   if (!cursor.isComplete(minPage))
                   {
@@ -346,19 +377,59 @@
                      break;
                   }
                }
-               
+
                if (complete)
                {
-                  System.out.println("Depaging complete now. We can leave page state at this point!");
-                  // move every cursor away from the main page, clearing every cursor's old pages while only keeping a bookmark for the next page case it happens again
+
+                  System.out.println("Disabling depage!");
+                  pagingStore.forceAnotherPage();
+
+                  Page currentPage = pagingStore.getCurrentPage();
+
+                  try
+                  {
+                     // First step: Move every cursor to the next bookmarked page (that was just created)
+                     for (PageCursor cursor : cursorList)
+                     {
+                        cursor.ack(new PagePositionImpl(currentPage.getPageId(), -1));
+                     }
+
+                     storageManager.waitOnOperations();
+                  }
+                  finally
+                  {
+                     for (PageCursor cursor : cursorList)
+                     {
+                        cursor.enableAutoCleanup();
+                     }
+                  }
+
+                  pagingStore.stopPaging();
+
+                  // This has to be called after we stopped paging
+                  for (PageCursor cursor : cursorList)
+                  {
+                     cursor.scheduleCleanupCheck();
+                  }
+
                }
             }
 
             for (long i = pagingStore.getFirstPage(); i < minPage; i++)
             {
                Page page = pagingStore.depage();
+               if (page == null)
+               {
+                  break;
+               }
                depagedPages.add(page);
             }
+
+            if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 &&
+                pagingStore.getCurrentPage().getNumberOfMessages() == 0)
+            {
+               pagingStore.stopPaging();
+            }
          }
          catch (Exception ex)
          {
@@ -412,11 +483,6 @@
     */
    private long checkMinPage(List<PageCursor> cursorList)
    {
-      if (cursorList.size() == 0)
-      {
-         return 0l;
-      }
-
       long minPage = Long.MAX_VALUE;
 
       for (PageCursor cursor : cursorList)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -113,18 +113,15 @@
    private volatile int currentPageId;
 
    private volatile Page currentPage;
+   
+   private volatile boolean paging = false;
 
-   private final ReentrantLock writeLock = new ReentrantLock();
-
    /** duplicate cache used at this address */
    private final DuplicateIDCache duplicateCache;
-   
+
    private final PageCursorProvider cursorProvider;
 
-   /** 
-    * We need to perform checks on currentPage with minimal locking
-    * */
-   private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
+   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
    private volatile boolean running = false;
 
@@ -193,7 +190,7 @@
       this.storeFactory = storeFactory;
 
       this.syncNonTransactional = syncNonTransactional;
-      
+
       this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
 
       // Post office could be null on the backup node
@@ -209,34 +206,34 @@
    }
 
    // Public --------------------------------------------------------
-   
+
    public String toString()
    {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
    // PagingStore implementation ------------------------------------
-   
+
    public void lock()
    {
-      writeLock.lock();
+      lock.writeLock().lock();
    }
-   
+
    public void unlock()
    {
-      writeLock.unlock();
+      lock.writeLock().unlock();
    }
-   
+
    public PageCursorProvider getCursorProvier()
    {
       return cursorProvider;
    }
-   
+
    public long getFirstPage()
    {
       return firstPageId;
    }
-   
+
    public long getTopPage()
    {
       return currentPageId;
@@ -269,7 +266,7 @@
 
    public boolean isPaging()
    {
-      currentPageLock.readLock().lock();
+      lock.readLock().lock();
 
       try
       {
@@ -283,12 +280,12 @@
          }
          else
          {
-            return currentPage != null;
+            return paging;
          }
       }
       finally
       {
-         currentPageLock.readLock().unlock();
+         lock.readLock().unlock();
       }
    }
 
@@ -296,7 +293,7 @@
    {
       return numberOfPages;
    }
-   
+
    public int getCurrentWritingPage()
    {
       return currentPageId;
@@ -322,7 +319,7 @@
 
    public void sync() throws Exception
    {
-      currentPageLock.readLock().lock();
+      lock.readLock().lock();
 
       try
       {
@@ -333,18 +330,17 @@
       }
       finally
       {
-         currentPageLock.readLock().unlock();
+         lock.readLock().unlock();
       }
    }
 
    public boolean startDepaging()
    {
-      
+
       // Disabled for now
-      
+
       return false;
-      
-      
+
       /*
       if (!running)
       {
@@ -384,7 +380,6 @@
          currentPageLock.readLock().unlock();
       } */
    }
-   
 
    public void processReload() throws Exception
    {
@@ -398,7 +393,7 @@
 
    // HornetQComponent implementation
 
-   public synchronized boolean isStarted()
+   public boolean isStarted()
    {
       return running;
    }
@@ -407,21 +402,13 @@
    {
       if (running)
       {
-         
+
          cursorProvider.stop();
 
          running = false;
 
-         Future future = new Future();
+         flushExecutors();
 
-         executor.execute(future);
-
-         if (!future.await(60000))
-         {
-            PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
-         }
-         
-
          if (currentPage != null)
          {
             currentPage.close();
@@ -429,10 +416,24 @@
          }
       }
    }
+   
+   public void flushExecutors()
+   {
+      cursorProvider.flushExecutors();
+      
+      Future future = new Future();
 
+      executor.execute(future);
+
+      if (!future.await(60000))
+      {
+         PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
+      }
+   }
+
    public void start() throws Exception
    {
-      writeLock.lock();
+      lock.writeLock().lock();
 
       try
       {
@@ -448,80 +449,78 @@
          }
          else
          {
-            currentPageLock.writeLock().lock();
+            running = true;
+            firstPageId = Integer.MAX_VALUE;
 
-            try
+            // There are no files yet on this Storage. We will just return it empty
+            if (fileFactory != null)
             {
-               running = true;
-               firstPageId = Integer.MAX_VALUE;
 
-               // There are no files yet on this Storage. We will just return it empty
-               if (fileFactory != null)
-               {
+               currentPageId = 0;
+               currentPage = null;
 
-                  currentPageId = 0;
-                  currentPage = null;
+               List<String> files = fileFactory.listFiles("page");
 
-                  List<String> files = fileFactory.listFiles("page");
+               numberOfPages = files.size();
 
-                  numberOfPages = files.size();
+               for (String fileName : files)
+               {
+                  final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
 
-                  for (String fileName : files)
+                  if (fileId > currentPageId)
                   {
-                     final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
-
-                     if (fileId > currentPageId)
-                     {
-                        currentPageId = fileId;
-                     }
-
-                     if (fileId < firstPageId)
-                     {
-                        firstPageId = fileId;
-                     }
+                     currentPageId = fileId;
                   }
-                  
-                  if (currentPageId != 0)
+
+                  if (fileId < firstPageId)
                   {
-                     currentPage = createPage(currentPageId);
-                     currentPage.open();
-                     
-                     List<PagedMessage> messages = currentPage.read();
-                     
-                     LivePageCache pageCache = new LivePageCacheImpl(currentPage);
-                     
-                     for (PagedMessage msg : messages)
-                     {
-                        msg.initMessage(storageManager);
-                        pageCache.addLiveMessage(msg);
-                     }
-                     
-                     currentPage.setLiveCache(pageCache);
-                     
-                     currentPageSize.set(currentPage.getSize());
-                     
-                     cursorProvider.addPageCache(pageCache);
+                     firstPageId = fileId;
                   }
-                  
-                  if (currentPage != null)
+               }
+
+               if (currentPageId != 0)
+               {
+                  currentPage = createPage(currentPageId);
+                  currentPage.open();
+
+                  List<PagedMessage> messages = currentPage.read();
+
+                  LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+                  for (PagedMessage msg : messages)
                   {
-                     
-                     startPaging();
+                     msg.initMessage(storageManager);
+                     pageCache.addLiveMessage(msg);
                   }
+
+                  currentPage.setLiveCache(pageCache);
+
+                  currentPageSize.set(currentPage.getSize());
+
+                  cursorProvider.addPageCache(pageCache);
                }
+               
+               // We will not mark it for paging if there's only a single empty file
+               if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0))
+               {
+                  startPaging();
+               }
             }
-            finally
-            {
-               currentPageLock.writeLock().unlock();
-            }
          }
 
       }
       finally
       {
-         writeLock.unlock();
+         lock.writeLock().unlock();
       }
    }
+   
+   public void stopPaging()
+   {
+      lock.writeLock().lock();
+      paging = false;
+      lock.writeLock().unlock();
+   }
 
    public boolean startPaging()
    {
@@ -530,28 +529,30 @@
          return false;
       }
 
-      // First check without any global locks.
-      // (Faster)
-      currentPageLock.readLock().lock();
+      lock.readLock().lock();
       try
       {
-         // Already paging, nothing to be done
-         if (currentPage != null)
+         if (paging)
          {
             return false;
          }
       }
       finally
       {
-         currentPageLock.readLock().unlock();
+         lock.readLock().unlock();
       }
 
       // if the first check failed, we do it again under a global currentPageLock
       // (writeLock) this time
-      writeLock.lock();
+      lock.writeLock().lock();
 
       try
       {
+         if (paging)
+         {
+            return false;
+         }
+         
          if (currentPage == null)
          {
             try
@@ -565,17 +566,15 @@
                PagingStoreImpl.log.warn("IO Error, impossible to start paging", e);
                return false;
             }
-
-            return true;
          }
-         else
-         {
-            return false;
-         }
+
+         paging = true;
+         
+         return true;
       }
       finally
       {
-         writeLock.unlock();
+         lock.writeLock().unlock();
       }
    }
 
@@ -594,22 +593,19 @@
       }
 
       SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
-      
+
       Page page = new PageImpl(storeName, storageManager, fileFactory, file, pageNumber);
 
-      // To create the file 
+      // To create the file
       file.open();
 
       file.position(0);
 
       file.close();
-      
 
       return page;
    }
 
-   // TestSupportPageStore ------------------------------------------
-
    public void forceAnotherPage() throws Exception
    {
       openNewPage();
@@ -625,9 +621,7 @@
     * */
    public Page depage() throws Exception
    {
-      writeLock.lock();
-
-      currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
+      lock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
       try
       {
          if (!running)
@@ -689,8 +683,7 @@
       }
       finally
       {
-         currentPageLock.writeLock().unlock();
-         writeLock.unlock();
+         lock.writeLock().unlock();
       }
 
    }
@@ -922,26 +915,26 @@
       }
 
       // We need to ensure a read lock, as depage could change the paging state
-      currentPageLock.readLock().lock();
+      lock.readLock().lock();
 
       try
       {
          // First check done concurrently, to avoid synchronization and increase throughput
-         if (currentPage == null)
+         if (!paging)
          {
             return false;
          }
       }
       finally
       {
-         currentPageLock.readLock().unlock();
+         lock.readLock().unlock();
       }
 
-      writeLock.lock();
+      lock.writeLock().lock();
 
       try
       {
-         if (currentPage == null)
+         if (!paging)
          {
             return false;
          }
@@ -971,42 +964,17 @@
             if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
             {
                // Make sure nothing is currently validating or using currentPage
-               currentPageLock.writeLock().lock();
-               try
-               {
-                  openNewPage();
-
-                  // openNewPage will set currentPageSize to zero, we need to set it again
-                  currentPageSize.addAndGet(bytesToWrite);
-               }
-               finally
-               {
-                  currentPageLock.writeLock().unlock();
-               }
+               openNewPage();
             }
 
-            currentPageLock.readLock().lock();
-
-            try
-            {
-               currentPage.write(pagedMessage);
- 
-               if (sync)
-               {
-                  currentPage.sync();
-               }
-            }
-            finally
-            {
-               currentPageLock.readLock().unlock();
-            }
+            currentPage.write(pagedMessage);
          }
 
          return true;
       }
       finally
       {
-         writeLock.unlock();
+         lock.writeLock().unlock();
       }
 
    }
@@ -1177,51 +1145,11 @@
       return duplicateIdForPage;
    }
 
-   /**
-    * @return
-    */
-   private boolean isAddressFull(final long nextPageSize)
-   {
-      return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
-   }
+   
 
-   /**
-    * startDepaging and clearDepage needs to be atomic.
-    * We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
-    * @return true if the depage status was cleared
-    */
-   private synchronized boolean clearDepage()
-   {
-      final boolean addressFull = isAddressFull(getPageSizeBytes());
-
-      if (PagingStoreImpl.isTrace)
-      {
-         PagingStoreImpl.trace("Clear Depage on Address = " + getStoreName() +
-                               " addressSize = " +
-                               getAddressSize() +
-                               " addressMax " +
-                               maxSize +
-                               " isPaging = " +
-                               isPaging() +
-                               " addressFull = " +
-                               addressFull);
-      }
-
-      // It should stop the executor when the address is full or when there is nothing else to be depaged
-      if (addressFull || !isPaging())
-      {
-         depaging.set(false);
-         return true;
-      }
-      else
-      {
-         return false;
-      }
-   }
-
    private void openNewPage() throws Exception
    {
-      currentPageLock.writeLock().lock();
+      lock.writeLock().lock();
 
       try
       {
@@ -1240,20 +1168,20 @@
          }
 
          currentPage = createPage(currentPageId);
-         
+
          LivePageCache pageCache = new LivePageCacheImpl(currentPage);
-         
+
          currentPage.setLiveCache(pageCache);
 
          cursorProvider.addPageCache(pageCache);
-         
+
          currentPageSize.set(0);
 
          currentPage.open();
       }
       finally
       {
-         currentPageLock.writeLock().unlock();
+         lock.writeLock().unlock();
       }
    }
 
@@ -1282,39 +1210,39 @@
 
    // Inner classes -------------------------------------------------
 
-/*   private class DepageRunnable implements Runnable
-   {
-      private final Executor followingExecutor;
-
-      public DepageRunnable(final Executor followingExecutor)
+   /*   private class DepageRunnable implements Runnable
       {
-         this.followingExecutor = followingExecutor;
-      }
+         private final Executor followingExecutor;
 
-      public void run()
-      {
-         try
+         public DepageRunnable(final Executor followingExecutor)
          {
-            if (running)
+            this.followingExecutor = followingExecutor;
+         }
+
+         public void run()
+         {
+            try
             {
-               if (!isAddressFull(getPageSizeBytes()))
+               if (running)
                {
-                  readPage();
-               }
+                  if (!isAddressFull(getPageSizeBytes()))
+                  {
+                     readPage();
+                  }
 
-               // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
-               // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
-               // the lock and this would dead lock
-               if (running && !clearDepage())
-               {
-                  followingExecutor.execute(this);
+                  // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
+                  // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
+                  // the lock and this would dead lock
+                  if (running && !clearDepage())
+                  {
+                     followingExecutor.execute(this);
+                  }
                }
             }
+            catch (Throwable e)
+            {
+               PagingStoreImpl.log.error(e, e);
+            }
          }
-         catch (Throwable e)
-         {
-            PagingStoreImpl.log.error(e, e);
-         }
-      }
-   } */
+      } */
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.paging.impl;
 
-import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagingStore;
 
 /**
@@ -23,10 +22,4 @@
  */
 public interface TestSupportPageStore extends PagingStore
 {
-   void forceAnotherPage() throws Exception;
-
-   /** @return true if paging was started, or false if paging was already started before this call */
-   boolean startPaging() throws Exception;
-
-   Page getCurrentPage();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -280,8 +280,12 @@
       if (timeout == 0)
       {
          waitCallback.waitCompletion();
+         return true;
       }
-      return waitCallback.waitCompletion(timeout);
+      else
+      {
+         return waitCallback.waitCompletion(timeout);
+      }
    }
 
 }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-27 21:50:48 UTC (rev 9817)
@@ -132,6 +132,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -219,6 +220,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -283,7 +285,13 @@
          }
       }
       cursorProvider.printDebug();
+      
 
+      server.getStorageManager().waitOnOperations();
+      lookupPageStore(ADDRESS).flushExecutors();
+
+
+
       // needs to clear the context since we are using the same thread over two distinct servers
       // otherwise we will get the old executor on the factory
       OperationContextImpl.clearContext();
@@ -312,9 +320,15 @@
 
       OperationContextImpl.getContext(null).waitCompletion();
       ((PageCursorImpl)cursor).printDebug();
+      
+      lookupPageStore(ADDRESS).flushExecutors();
+      
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
 
       server.stop();
       createServer();
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -379,6 +393,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -450,6 +465,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -558,13 +574,13 @@
          if (i % 100 == 0)
             System.out.println("Paged " + i);
 
-         if (i >= NUM_MESSAGES * 2)
+         if (i >= NUM_MESSAGES * 2 - 1)
          {
 
             HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
 
             ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
-            msg.putIntProperty("key", i);
+            msg.putIntProperty("key", i + 1);
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
@@ -579,13 +595,47 @@
 
          assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
       }
+      
+      Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+      
+      assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
+      
+      cursor.ack(readMessage.a);
+      
+      server.getStorageManager().waitOnOperations();
 
+      pageStore.flushExecutors();
+      
+      assertFalse(pageStore.isPaging());
+
       server.stop();
       createServer();
-      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+      
+      assertFalse(pageStore.isPaging());
 
+      waitCleanup();
+      
+      assertFalse(lookupPageStore(ADDRESS).isPaging());
+
    }
 
+   /**
+    * @throws Exception
+    * @throws InterruptedException
+    */
+   private void waitCleanup() throws Exception, InterruptedException
+   {
+      // The cleanup is done asynchronously, so we need to wait some time
+      long timeout = System.currentTimeMillis() + 10000;
+      
+      while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+      {
+         Thread.sleep(100);
+      }
+
+      assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+   }
+
    public void testPrepareScenarios() throws Exception
    {
       PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -663,6 +713,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -707,6 +758,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }
@@ -754,6 +806,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
    }
 
@@ -815,6 +868,7 @@
 
       server.stop();
       createServer();
+      waitCleanup();
       assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
 
    }



More information about the hornetq-commits mailing list