[hornetq-commits] JBoss hornetq SVN: r9817 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 27 17:50:48 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-10-27 17:50:48 -0400 (Wed, 27 Oct 2010)
New Revision: 9817
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
cleanup page support
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -83,13 +83,22 @@
Page depage() throws Exception;
+ void forceAnotherPage() throws Exception;
+ Page getCurrentPage();
+
+
/**
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
boolean startDepaging();
+
+ /** @return true if paging was started, or false if paging was already started before this call */
+ boolean startPaging() throws Exception;
+ void stopPaging() throws Exception;
+
void addSize(int size);
void executeRunnableWhenMemoryAvailable(Runnable runnable);
@@ -103,4 +112,8 @@
*
*/
void unlock();
+
+ /** This is used mostly by tests.
+ * We will wait any pending runnable to finish its execution */
+ void flushExecutors();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -40,6 +40,14 @@
// To be called when the cursor is closed for good. Most likely when the queue is deleted
void close() throws Exception;
+ void scheduleCleanupCheck();
+
+ void cleanupEntries() throws Exception;
+
+ void disableAutoCleanup();
+
+ void enableAutoCleanup();
+
Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
@@ -85,4 +93,6 @@
* @return
*/
boolean isComplete(long minPage);
+
+ void flushExecutors();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -66,8 +66,13 @@
void processReload() throws Exception;
void stop();
+
+ void flushExecutors();
void scheduleCleanup();
+
+ // Perform the cleanup at the caller's thread (for startup and recovery)
+ void cleanup();
/**
* @param pageCursorImpl
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -68,10 +68,12 @@
System.out.println(message);
}
+ private volatile boolean autoCleanup = true;
+
private final StorageManager store;
private final long cursorId;
-
+
private final Filter filter;
private final PagingStore pageStore;
@@ -112,6 +114,16 @@
// Public --------------------------------------------------------
+ public void disableAutoCleanup()
+ {
+ autoCleanup = false;
+ }
+
+ public void enableAutoCleanup()
+ {
+ autoCleanup = true;
+ }
+
public PageCursorProvider getProvider()
{
return cursorProvider;
@@ -438,7 +450,7 @@
}
}
- public void stop()
+ public void flushExecutors()
{
Future future = new Future();
executor.execute(future);
@@ -448,6 +460,11 @@
}
}
+ public void stop()
+ {
+ flushExecutors();
+ }
+
public void printDebug()
{
printDebug(toString());
@@ -507,7 +524,10 @@
if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
{
// there's a different page being acked, we will do the check right away
- scheduleCleanupCheck();
+ if (autoCleanup)
+ {
+ scheduleCleanupCheck();
+ }
}
lastAckedPosition = pos;
}
@@ -547,32 +567,38 @@
*/
private void onPageDone(final PageCursorInfo info)
{
- scheduleCleanupCheck();
+ if (autoCleanup)
+ {
+ scheduleCleanupCheck();
+ }
}
- private void scheduleCleanupCheck()
+ public void scheduleCleanupCheck()
{
- executor.execute(new Runnable()
+ if (autoCleanup)
{
+ executor.execute(new Runnable()
+ {
- public void run()
- {
- try
+ public void run()
{
- cleanupPages();
+ try
+ {
+ cleanupEntries();
+ }
+ catch (Exception e)
+ {
+ PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
+ }
}
- catch (Exception e)
- {
- PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
- }
- }
- });
+ });
+ }
}
/**
* It will cleanup all the records for completed pages
* */
- private void cleanupPages() throws Exception
+ public void cleanupEntries() throws Exception
{
Transaction tx = new TransactionImpl(store);
@@ -687,7 +713,11 @@
@Override
public String toString()
{
- return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages + ", confirmed = " + confirmed;
+ return "PageCursorInfo::PageID=" + pageId +
+ " numberOfMessage = " +
+ numberOfMessages +
+ ", confirmed = " +
+ confirmed;
}
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -264,6 +264,9 @@
{
cursor.processReload();
}
+
+ cleanup();
+
}
public void stop()
@@ -289,6 +292,29 @@
}
+ public void flushExecutors()
+ {
+ for (PageCursor cursor : activeCursors.values())
+ {
+ cursor.flushExecutors();
+ }
+
+ for (PageCursor cursor : nonPersistentCursors)
+ {
+ cursor.flushExecutors();
+ }
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(10000))
+ {
+ log.warn("Waiting cursor provider " + this + " to finish executors");
+ }
+
+ }
+
public void close(PageCursor cursor)
{
if (cursor.getId() != 0)
@@ -318,7 +344,7 @@
});
}
- private void cleanup()
+ public void cleanup()
{
ArrayList<Page> depagedPages = new ArrayList<Page>();
@@ -328,17 +354,22 @@
{
try
{
+ if (!pagingStore.isStarted())
+ {
+ return;
+ }
+
ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
cursorList.addAll(activeCursors.values());
cursorList.addAll(nonPersistentCursors);
long minPage = checkMinPage(cursorList);
-
- if (minPage == pagingStore.getCurrentWritingPage())
+
+ if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
-
- for (PageCursor cursor: cursorList)
+
+ for (PageCursor cursor : cursorList)
{
if (!cursor.isComplete(minPage))
{
@@ -346,19 +377,59 @@
break;
}
}
-
+
if (complete)
{
- System.out.println("Depaging complete now. We can leave page state at this point!");
- // move every cursor away from the main page, clearing every cursor's old pages while only keeping a bookmark for the next page case it happens again
+
+ System.out.println("Disabling depage!");
+ pagingStore.forceAnotherPage();
+
+ Page currentPage = pagingStore.getCurrentPage();
+
+ try
+ {
+ // First step: Move every cursor to the next bookmarked page (that was just created)
+ for (PageCursor cursor : cursorList)
+ {
+ cursor.ack(new PagePositionImpl(currentPage.getPageId(), -1));
+ }
+
+ storageManager.waitOnOperations();
+ }
+ finally
+ {
+ for (PageCursor cursor : cursorList)
+ {
+ cursor.enableAutoCleanup();
+ }
+ }
+
+ pagingStore.stopPaging();
+
+ // This has to be called after we stopped paging
+ for (PageCursor cursor : cursorList)
+ {
+ cursor.scheduleCleanupCheck();
+ }
+
}
}
for (long i = pagingStore.getFirstPage(); i < minPage; i++)
{
Page page = pagingStore.depage();
+ if (page == null)
+ {
+ break;
+ }
depagedPages.add(page);
}
+
+ if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 &&
+ pagingStore.getCurrentPage().getNumberOfMessages() == 0)
+ {
+ pagingStore.stopPaging();
+ }
}
catch (Exception ex)
{
@@ -412,11 +483,6 @@
*/
private long checkMinPage(List<PageCursor> cursorList)
{
- if (cursorList.size() == 0)
- {
- return 0l;
- }
-
long minPage = Long.MAX_VALUE;
for (PageCursor cursor : cursorList)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -113,18 +113,15 @@
private volatile int currentPageId;
private volatile Page currentPage;
+
+ private volatile boolean paging = false;
- private final ReentrantLock writeLock = new ReentrantLock();
-
/** duplicate cache used at this address */
private final DuplicateIDCache duplicateCache;
-
+
private final PageCursorProvider cursorProvider;
- /**
- * We need to perform checks on currentPage with minimal locking
- * */
- private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private volatile boolean running = false;
@@ -193,7 +190,7 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
// Post office could be null on the backup node
@@ -209,34 +206,34 @@
}
// Public --------------------------------------------------------
-
+
public String toString()
{
return "PagingStoreImpl(" + this.address + ")";
}
// PagingStore implementation ------------------------------------
-
+
public void lock()
{
- writeLock.lock();
+ lock.writeLock().lock();
}
-
+
public void unlock()
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
-
+
public PageCursorProvider getCursorProvier()
{
return cursorProvider;
}
-
+
public long getFirstPage()
{
return firstPageId;
}
-
+
public long getTopPage()
{
return currentPageId;
@@ -269,7 +266,7 @@
public boolean isPaging()
{
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
@@ -283,12 +280,12 @@
}
else
{
- return currentPage != null;
+ return paging;
}
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
@@ -296,7 +293,7 @@
{
return numberOfPages;
}
-
+
public int getCurrentWritingPage()
{
return currentPageId;
@@ -322,7 +319,7 @@
public void sync() throws Exception
{
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
@@ -333,18 +330,17 @@
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public boolean startDepaging()
{
-
+
// Disabled for now
-
+
return false;
-
-
+
/*
if (!running)
{
@@ -384,7 +380,6 @@
currentPageLock.readLock().unlock();
} */
}
-
public void processReload() throws Exception
{
@@ -398,7 +393,7 @@
// HornetQComponent implementation
- public synchronized boolean isStarted()
+ public boolean isStarted()
{
return running;
}
@@ -407,21 +402,13 @@
{
if (running)
{
-
+
cursorProvider.stop();
running = false;
- Future future = new Future();
+ flushExecutors();
- executor.execute(future);
-
- if (!future.await(60000))
- {
- PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
- }
-
-
if (currentPage != null)
{
currentPage.close();
@@ -429,10 +416,24 @@
}
}
}
+
+ public void flushExecutors()
+ {
+ cursorProvider.flushExecutors();
+
+ Future future = new Future();
+ executor.execute(future);
+
+ if (!future.await(60000))
+ {
+ PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
+ }
+ }
+
public void start() throws Exception
{
- writeLock.lock();
+ lock.writeLock().lock();
try
{
@@ -448,80 +449,78 @@
}
else
{
- currentPageLock.writeLock().lock();
+ running = true;
+ firstPageId = Integer.MAX_VALUE;
- try
+ // There are no files yet on this Storage. We will just return it empty
+ if (fileFactory != null)
{
- running = true;
- firstPageId = Integer.MAX_VALUE;
- // There are no files yet on this Storage. We will just return it empty
- if (fileFactory != null)
- {
+ currentPageId = 0;
+ currentPage = null;
- currentPageId = 0;
- currentPage = null;
+ List<String> files = fileFactory.listFiles("page");
- List<String> files = fileFactory.listFiles("page");
+ numberOfPages = files.size();
- numberOfPages = files.size();
+ for (String fileName : files)
+ {
+ final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
- for (String fileName : files)
+ if (fileId > currentPageId)
{
- final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
-
- if (fileId > currentPageId)
- {
- currentPageId = fileId;
- }
-
- if (fileId < firstPageId)
- {
- firstPageId = fileId;
- }
+ currentPageId = fileId;
}
-
- if (currentPageId != 0)
+
+ if (fileId < firstPageId)
{
- currentPage = createPage(currentPageId);
- currentPage.open();
-
- List<PagedMessage> messages = currentPage.read();
-
- LivePageCache pageCache = new LivePageCacheImpl(currentPage);
-
- for (PagedMessage msg : messages)
- {
- msg.initMessage(storageManager);
- pageCache.addLiveMessage(msg);
- }
-
- currentPage.setLiveCache(pageCache);
-
- currentPageSize.set(currentPage.getSize());
-
- cursorProvider.addPageCache(pageCache);
+ firstPageId = fileId;
}
-
- if (currentPage != null)
+ }
+
+ if (currentPageId != 0)
+ {
+ currentPage = createPage(currentPageId);
+ currentPage.open();
+
+ List<PagedMessage> messages = currentPage.read();
+
+ LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+ for (PagedMessage msg : messages)
{
-
- startPaging();
+ msg.initMessage(storageManager);
+ pageCache.addLiveMessage(msg);
}
+
+ currentPage.setLiveCache(pageCache);
+
+ currentPageSize.set(currentPage.getSize());
+
+ cursorProvider.addPageCache(pageCache);
}
+
+ // We will not mark it for paging if there's only a single empty file
+ if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0))
+ {
+ startPaging();
+ }
}
- finally
- {
- currentPageLock.writeLock().unlock();
- }
}
}
finally
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
+
+ public void stopPaging()
+ {
+ lock.writeLock().lock();
+ paging = false;
+ lock.writeLock().unlock();
+ }
public boolean startPaging()
{
@@ -530,28 +529,30 @@
return false;
}
- // First check without any global locks.
- // (Faster)
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
- // Already paging, nothing to be done
- if (currentPage != null)
+ if (paging)
{
return false;
}
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
// if the first check failed, we do it again under a global currentPageLock
// (writeLock) this time
- writeLock.lock();
+ lock.writeLock().lock();
try
{
+ if (paging)
+ {
+ return false;
+ }
+
if (currentPage == null)
{
try
@@ -565,17 +566,15 @@
PagingStoreImpl.log.warn("IO Error, impossible to start paging", e);
return false;
}
-
- return true;
}
- else
- {
- return false;
- }
+
+ paging = true;
+
+ return true;
}
finally
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -594,22 +593,19 @@
}
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
-
+
Page page = new PageImpl(storeName, storageManager, fileFactory, file, pageNumber);
- // To create the file
+ // To create the file
file.open();
file.position(0);
file.close();
-
return page;
}
- // TestSupportPageStore ------------------------------------------
-
public void forceAnotherPage() throws Exception
{
openNewPage();
@@ -625,9 +621,7 @@
* */
public Page depage() throws Exception
{
- writeLock.lock();
-
- currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
+ lock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
try
{
if (!running)
@@ -689,8 +683,7 @@
}
finally
{
- currentPageLock.writeLock().unlock();
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -922,26 +915,26 @@
}
// We need to ensure a read lock, as depage could change the paging state
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
// First check done concurrently, to avoid synchronization and increase throughput
- if (currentPage == null)
+ if (!paging)
{
return false;
}
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
- writeLock.lock();
+ lock.writeLock().lock();
try
{
- if (currentPage == null)
+ if (!paging)
{
return false;
}
@@ -971,42 +964,17 @@
if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
{
// Make sure nothing is currently validating or using currentPage
- currentPageLock.writeLock().lock();
- try
- {
- openNewPage();
-
- // openNewPage will set currentPageSize to zero, we need to set it again
- currentPageSize.addAndGet(bytesToWrite);
- }
- finally
- {
- currentPageLock.writeLock().unlock();
- }
+ openNewPage();
}
- currentPageLock.readLock().lock();
-
- try
- {
- currentPage.write(pagedMessage);
-
- if (sync)
- {
- currentPage.sync();
- }
- }
- finally
- {
- currentPageLock.readLock().unlock();
- }
+ currentPage.write(pagedMessage);
}
return true;
}
finally
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -1177,51 +1145,11 @@
return duplicateIdForPage;
}
- /**
- * @return
- */
- private boolean isAddressFull(final long nextPageSize)
- {
- return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
- }
+
- /**
- * startDepaging and clearDepage needs to be atomic.
- * We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
- * @return true if the depage status was cleared
- */
- private synchronized boolean clearDepage()
- {
- final boolean addressFull = isAddressFull(getPageSizeBytes());
-
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Clear Depage on Address = " + getStoreName() +
- " addressSize = " +
- getAddressSize() +
- " addressMax " +
- maxSize +
- " isPaging = " +
- isPaging() +
- " addressFull = " +
- addressFull);
- }
-
- // It should stop the executor when the address is full or when there is nothing else to be depaged
- if (addressFull || !isPaging())
- {
- depaging.set(false);
- return true;
- }
- else
- {
- return false;
- }
- }
-
private void openNewPage() throws Exception
{
- currentPageLock.writeLock().lock();
+ lock.writeLock().lock();
try
{
@@ -1240,20 +1168,20 @@
}
currentPage = createPage(currentPageId);
-
+
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
-
+
currentPage.setLiveCache(pageCache);
cursorProvider.addPageCache(pageCache);
-
+
currentPageSize.set(0);
currentPage.open();
}
finally
{
- currentPageLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
}
@@ -1282,39 +1210,39 @@
// Inner classes -------------------------------------------------
-/* private class DepageRunnable implements Runnable
- {
- private final Executor followingExecutor;
-
- public DepageRunnable(final Executor followingExecutor)
+ /* private class DepageRunnable implements Runnable
{
- this.followingExecutor = followingExecutor;
- }
+ private final Executor followingExecutor;
- public void run()
- {
- try
+ public DepageRunnable(final Executor followingExecutor)
{
- if (running)
+ this.followingExecutor = followingExecutor;
+ }
+
+ public void run()
+ {
+ try
{
- if (!isAddressFull(getPageSizeBytes()))
+ if (running)
{
- readPage();
- }
+ if (!isAddressFull(getPageSizeBytes()))
+ {
+ readPage();
+ }
- // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
- // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
- // the lock and this would dead lock
- if (running && !clearDepage())
- {
- followingExecutor.execute(this);
+ // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
+ // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
+ // the lock and this would dead lock
+ if (running && !clearDepage())
+ {
+ followingExecutor.execute(this);
+ }
}
}
+ catch (Throwable e)
+ {
+ PagingStoreImpl.log.error(e, e);
+ }
}
- catch (Throwable e)
- {
- PagingStoreImpl.log.error(e, e);
- }
- }
- } */
+ } */
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingStore;
/**
@@ -23,10 +22,4 @@
*/
public interface TestSupportPageStore extends PagingStore
{
- void forceAnotherPage() throws Exception;
-
- /** @return true if paging was started, or false if paging was already started before this call */
- boolean startPaging() throws Exception;
-
- Page getCurrentPage();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -280,8 +280,12 @@
if (timeout == 0)
{
waitCallback.waitCompletion();
+ return true;
}
- return waitCallback.waitCompletion(timeout);
+ else
+ {
+ return waitCallback.waitCompletion(timeout);
+ }
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -132,6 +132,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -219,6 +220,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -283,7 +285,13 @@
}
}
cursorProvider.printDebug();
+
+ server.getStorageManager().waitOnOperations();
+ lookupPageStore(ADDRESS).flushExecutors();
+
+
+
// needs to clear the context since we are using the same thread over two distinct servers
// otherwise we will get the old executor on the factory
OperationContextImpl.clearContext();
@@ -312,9 +320,15 @@
OperationContextImpl.getContext(null).waitCompletion();
((PageCursorImpl)cursor).printDebug();
+
+ lookupPageStore(ADDRESS).flushExecutors();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
server.stop();
createServer();
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -379,6 +393,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -450,6 +465,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -558,13 +574,13 @@
if (i % 100 == 0)
System.out.println("Paged " + i);
- if (i >= NUM_MESSAGES * 2)
+ if (i >= NUM_MESSAGES * 2 - 1)
{
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", i);
+ msg.putIntProperty("key", i + 1);
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
@@ -579,13 +595,47 @@
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
}
+
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+
+ assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
+
+ cursor.ack(readMessage.a);
+
+ server.getStorageManager().waitOnOperations();
+ pageStore.flushExecutors();
+
+ assertFalse(pageStore.isPaging());
+
server.stop();
createServer();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ assertFalse(pageStore.isPaging());
+ waitCleanup();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
}
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ private void waitCleanup() throws Exception, InterruptedException
+ {
+ // The cleanup is done asynchronously, so we need to wait some time
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+ {
+ Thread.sleep(100);
+ }
+
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+ }
+
public void testPrepareScenarios() throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -663,6 +713,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -707,6 +758,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -754,6 +806,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -815,6 +868,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
More information about the hornetq-commits
mailing list