Author: clebert.suconic(a)jboss.com
Date: 2010-10-16 00:00:15 -0400 (Sat, 16 Oct 2010)
New Revision: 9794
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Cleanup on paging
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -70,7 +70,20 @@
PageCursorProvider getCursorProvier();
void processReload() throws Exception;
+
+ /**
+ * Remove the first page from the Writing Queue.
+ * The file will still exist until Page.delete is called,
+ * So, case the system is reloaded the same Page will be loaded back if delete is not
called.
+ *
+ * @throws Exception
+ *
+ * Note: This should still be part of the interface, even though HornetQ only uses
through the
+ */
+ Page depage() throws Exception;
+
+
/**
* @return false if a thread was already started, or if not in page mode
* @throws Exception
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -26,6 +26,8 @@
public interface PageCache
{
Page getPage();
+
+ long getPageId();
int getNumberOfMessages();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
/**
@@ -37,6 +36,11 @@
void ack(PagePosition position) throws Exception;
void ackTx(Transaction tx, PagePosition position) throws Exception;
+ /**
+ *
+ * @return the first page in use or MAX_LONG if none is in use
+ */
+ long getFirstPage();
// Reload operations
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -16,7 +16,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.server.ServerMessage;
/**
* The provider of Cursor for a given Address
@@ -49,13 +48,13 @@
* @param queueId The cursorID should be the same as the queueId associated for
persistance
* @return
*/
- PageCursor getCursor(long queueId);
+ PageCursor getPersistentCursor(long queueId);
/**
* Create a non persistent cursor, usually associated with browsing
* @return
*/
- PageCursor createCursor();
+ PageCursor createNonPersistentCursor();
Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos)
throws Exception;
@@ -65,6 +64,8 @@
void stop();
+ void scheduleCleanup();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -63,6 +63,11 @@
{
return page;
}
+
+ public long getPageId()
+ {
+ return page.getPageId();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getNumberOfMessages()
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -81,6 +81,11 @@
lock.readLock().unlock();
}
}
+
+ public long getPageId()
+ {
+ return page.getPageId();
+ }
public void lock()
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -189,8 +189,26 @@
installTXCallback(tx, position);
}
+
/* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
+ */
+ public long getFirstPage()
+ {
+ Long firstKey = consumedPages.firstKey();
+ if (firstKey == null)
+ {
+ return Long.MAX_VALUE;
+ }
+ else
+ {
+ return consumedPages.firstKey();
+ }
+ }
+
+
+ /* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
public synchronized void redeliver(final PagePosition position)
@@ -348,6 +366,11 @@
{
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
{
+ if (lastAckedPosition != null && lastAckedPosition.getPageNr() !=
pos.getPageNr())
+ {
+ // there's a different page being acked, we will do the check right away
+ scheduleCleanupCheck();
+ }
this.lastAckedPosition = pos;
}
PageCursorInfo info = getPageInfo(pos);
@@ -386,6 +409,11 @@
*/
private void onPageDone(final PageCursorInfo info)
{
+ scheduleCleanupCheck();
+ }
+
+ private void scheduleCleanupCheck()
+ {
executor.execute(new Runnable()
{
@@ -458,18 +486,27 @@
@Override
public void afterCommit(final Transaction tx)
{
- synchronized (PageCursorImpl.this)
+ executor.execute(new Runnable()
{
- for (PageCursorInfo completePage : completedPages)
+
+ public void run()
{
- if (isTrace)
+ synchronized (PageCursorImpl.this)
{
- PageCursorImpl.trace("Removing page " +
completePage.getPageId());
+ for (PageCursorInfo completePage : completedPages)
+ {
+ if (isTrace)
+ {
+ PageCursorImpl.trace("Removing page " +
completePage.getPageId());
+ }
+ System.out.println("Removing page " +
completePage.getPageId());
+ consumedPages.remove(completePage.getPageId());
+ }
}
- System.out.println("Removing page " +
completePage.getPageId());
- consumedPages.remove(completePage.getPageId());
+
+ cursorProvider.scheduleCleanup();
}
- }
+ });
}
});
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -13,8 +13,10 @@
package org.hornetq.core.paging.cursor.impl;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
@@ -28,6 +30,8 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.ConcurrentSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -51,16 +55,20 @@
// Attributes ----------------------------------------------------
private final PagingStore pagingStore;
-
+
private final PagingManager pagingManager;
private final StorageManager storageManager;
private final ExecutorFactory executorFactory;
+ private final Executor executor;
+
private SoftValueHashMap<Long, PageCache> softCache = new
SoftValueHashMap<Long, PageCache>();
private ConcurrentMap<Long, PageCursor> activeCursors = new
ConcurrentHashMap<Long, PageCursor>();
+
+ private ConcurrentSet<PageCursor> nonPersistentCursors = new
ConcurrentHashSet<PageCursor>();
// Static --------------------------------------------------------
@@ -74,6 +82,7 @@
this.pagingManager = pagingStore.getPagingManager();
this.storageManager = storageManager;
this.executorFactory = executorFactory;
+ this.executor = executorFactory.getExecutor();
}
// Public --------------------------------------------------------
@@ -86,7 +95,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public PageCursor getCursor(long cursorID)
+ public PageCursor getPersistentCursor(long cursorID)
{
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor == null)
@@ -105,9 +114,11 @@
/**
* this will create a non-persistent cursor
*/
- public PageCursor createCursor()
+ public PageCursor createNonPersistentCursor()
{
- return new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), 0);
+ PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager,
executorFactory.getExecutor(), 0);
+ nonPersistentCursors.add(cursor);
+ return cursor;
}
/* (non-Javadoc)
@@ -116,16 +127,15 @@
public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor,
PagePosition cursorPos) throws Exception
{
- while(true)
+ while (true)
{
Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
-
+
if (retPos == null)
{
return null;
}
- else
- if (retPos != null)
+ else if (retPos != null)
{
cursorPos = retPos.a;
if (retPos.b.getTransactionID() != 0)
@@ -133,7 +143,9 @@
PageTransactionInfo tx =
pagingManager.getTransaction(retPos.b.getTransactionID());
if (tx == null)
{
- log.warn("Couldn't locate page transaction " +
retPos.b.getTransactionID() + ", ignoring message on position " + retPos.a);
+ log.warn("Couldn't locate page transaction " +
retPos.b.getTransactionID() +
+ ", ignoring message on position " +
+ retPos.a);
cursor.positionIgnored(cursorPos);
}
else
@@ -151,7 +163,7 @@
}
}
}
-
+
private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -174,9 +186,9 @@
return null;
}
}
-
+
PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
-
+
if (serverMessage != null)
{
return new Pair<PagePosition, PagedMessage>(retPos,
cache.getMessage(retPos.getMessageNr()));
@@ -213,11 +225,10 @@
}
return cache;
}
-
+
public synchronized void addPageCache(PageCache cache)
{
- // TODO: remove the type cast here
- softCache.put((long)cache.getPage().getPageId(), cache);
+ softCache.put(cache.getPageId(), cache);
}
public synchronized int getCacheSize()
@@ -233,20 +244,53 @@
}
}
-
public void stop()
{
for (PageCursor cursor : activeCursors.values())
{
cursor.stop();
}
-
+
activeCursors.clear();
}
-
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursorProvider#scheduleCleanup()
+ */
+ public void scheduleCleanup()
+ {
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ cleanup();
+ }
+ });
+ }
+
+ private void cleanup()
+ {
+ long minPage = getMinPageInUse();
+
+ try
+ {
+ System.out.println("MinPage = " + minPage + " firstPage = "
+ pagingStore.getFirstPage());
+ for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+ {
+ Page page = pagingStore.depage();
+ System.out.println("Deleting files associated with page " + page);
+ page.delete();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Couldn't complete cleanup on paging", e);
+ }
+ }
+
public void printDebug()
{
- for (PageCache cache: softCache.values())
+ for (PageCache cache : softCache.values())
{
System.out.println("Cache " + cache);
}
@@ -264,6 +308,37 @@
// Private -------------------------------------------------------
+ private long getMinPageInUse()
+ {
+ ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+ synchronized (this)
+ {
+ cursorList.addAll(activeCursors.values());
+ cursorList.addAll(nonPersistentCursors);
+ }
+
+ if (cursorList.size() == 0)
+ {
+ return 0l;
+ }
+
+ long minPage = Long.MAX_VALUE;
+
+ System.out.println("CursorList : " + cursorList.size());
+
+ for (PageCursor cursor : cursorList)
+ {
+ long firstPage = cursor.getFirstPage();
+ if (firstPage < minPage)
+ {
+ minPage = firstPage;
+ }
+ }
+
+ return minPage;
+
+ }
+
private PageCache getPageCache(final long pageId)
{
try
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -23,17 +23,6 @@
*/
public interface TestSupportPageStore extends PagingStore
{
- /**
- * Remove the first page from the Writing Queue.
- * The file will still exist until Page.delete is called,
- * So, case the system is reloaded the same Page will be loaded back if delete is not
called.
- *
- * @throws Exception
- *
- * Note: This should still be part of the interface, even though HornetQ only uses
through the
- */
- Page depage() throws Exception;
-
void forceAnotherPage() throws Exception;
/** @return true if paging was started, or false if paging was already started before
this call */
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -1028,7 +1028,7 @@
{
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
- PageCursor cursor =
store.getCursorProvier().getCursor(encoding.queueID);
+ PageCursor cursor =
store.getCursorProvier().getPersistentCursor(encoding.queueID);
cursor.reloadACK(encoding.position);
}
else
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15
20:18:59 UTC (rev 9793)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-16
04:00:15 UTC (rev 9794)
@@ -106,7 +106,7 @@
public void testSimpleCursor() throws Exception
{
- final int NUM_MESSAGES = 1000;
+ final int NUM_MESSAGES = 100;
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
@@ -114,7 +114,7 @@
PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(),
server.getExecutorFactory());
- PageCursor cursor = cursorProvider.createCursor();
+ PageCursor cursor = cursorProvider.createNonPersistentCursor();
Pair<PagePosition, PagedMessage> msg;
@@ -162,7 +162,7 @@
PageCursorProviderImpl cursorProvider =
(PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageCache firstPage = cursorProvider.getPageCache(new
PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
@@ -194,7 +194,7 @@
server.start();
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
@@ -227,7 +227,7 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
for (int i = 0 ; i < 100 ; i++)
@@ -246,7 +246,7 @@
server.start();
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
for (int i = 10; i <= 20; i++)
{
@@ -277,7 +277,7 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -300,7 +300,7 @@
server.start();
- cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
@@ -336,7 +336,7 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -382,7 +382,7 @@
PageCursorProvider cursorProvider =
this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor = pageStore.getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor =
pageStore.getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -465,21 +465,6 @@
pageStore.page(messages, pgParameter.getTransactionID());
}
- public void testRollbackScenariosOnACK() throws Exception
- {
-
- }
-
- public void testReadRolledBackData() throws Exception
- {
-
- }
-
- public void testRedeliveryScenarios() throws Exception
- {
-
- }
-
public void testCleanupScenarios() throws Exception
{
// Validate the pages are being cleared (with multiple cursors)