[hornetq-commits] JBoss hornetq SVN: r9813 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Oct 25 20:08:05 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-10-25 20:08:05 -0400 (Mon, 25 Oct 2010)
New Revision: 9813
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Clear depage logic
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -93,4 +93,14 @@
void addSize(int size);
void executeRunnableWhenMemoryAvailable(Runnable runnable);
+
+ /** This method will hold and producer, but it wait operations to finish before locking (write lock) */
+ void lock();
+
+ /**
+ *
+ * Call this method using the same thread used by the last call of {@link PagingStore#lock()}
+ *
+ */
+ void unlock();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -79,4 +79,10 @@
void redeliver(PagePosition position);
void printDebug();
+
+ /**
+ * @param minPage
+ * @return
+ */
+ boolean isComplete(long minPage);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -271,6 +271,15 @@
processACK(position);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
+ */
+ public boolean isComplete(long page)
+ {
+ PageCursorInfo info = consumedPages.get(page);
+ return info != null && info.isDone();
+ }
+
/**
* All the data associated with the cursor should go away here
*/
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -14,7 +14,9 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -66,7 +68,7 @@
private final Executor executor;
- private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
+ private Map<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
@@ -85,7 +87,7 @@
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
- }
+ }
// Public --------------------------------------------------------
@@ -93,16 +95,21 @@
{
return pagingStore;
}
-
- public PageCursor createPersistentCursor(long cursorID, Filter filter)
+
+ public synchronized PageCursor createPersistentCursor(long cursorID, Filter filter)
{
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
throw new IllegalStateException("Cursor " + cursorID + " had already been created");
}
-
- activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, cursorID);
+
+ activeCursor = new PageCursorImpl(this,
+ pagingStore,
+ storageManager,
+ executorFactory.getExecutor(),
+ filter,
+ cursorID);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
@@ -110,7 +117,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public PageCursor getPersistentCursor(long cursorID)
+ public synchronized PageCursor getPersistentCursor(long cursorID)
{
return activeCursors.get(cursorID);
}
@@ -118,9 +125,14 @@
/**
* this will create a non-persistent cursor
*/
- public PageCursor createNonPersistentCursor(Filter filter)
+ public synchronized PageCursor createNonPersistentCursor(Filter filter)
{
- PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, 0);
+ PageCursor cursor = new PageCursorImpl(this,
+ pagingStore,
+ storageManager,
+ executorFactory.getExecutor(),
+ filter,
+ 0);
nonPersistentCursors.add(cursor);
return cursor;
}
@@ -230,14 +242,20 @@
return cache;
}
- public synchronized void addPageCache(PageCache cache)
+ public void addPageCache(PageCache cache)
{
- softCache.put(cache.getPageId(), cache);
+ synchronized (softCache)
+ {
+ softCache.put(cache.getPageId(), cache);
+ }
}
- public synchronized int getCacheSize()
+ public int getCacheSize()
{
- return softCache.size();
+ synchronized (softCache)
+ {
+ return softCache.size();
+ }
}
public void processReload() throws Exception
@@ -302,24 +320,70 @@
private void cleanup()
{
- long minPage = getMinPageInUse();
+ ArrayList<Page> depagedPages = new ArrayList<Page>();
- try
+ pagingStore.lock();
+
+ synchronized (this)
{
- for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+ try
{
- Page page = pagingStore.depage();
- if (page != null)
+ ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+ cursorList.addAll(activeCursors.values());
+ cursorList.addAll(nonPersistentCursors);
+
+ long minPage = checkMinPage(cursorList);
+
+ if (minPage == pagingStore.getCurrentWritingPage())
{
- System.out.println("Deleting " + page);
- page.delete();
+ boolean complete = true;
+
+ for (PageCursor cursor: cursorList)
+ {
+ if (!cursor.isComplete(minPage))
+ {
+ complete = false;
+ break;
+ }
+ }
+
+ if (complete)
+ {
+ System.out.println("Depaging complete now. We can leave page state at this point!");
+ // move every cursor away from the main page, clearing every cursor's old pages while only keeping a bookmark for the next page case it happens again
+ }
}
+
+ for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+ {
+ Page page = pagingStore.depage();
+ depagedPages.add(page);
+ }
}
+ catch (Exception ex)
+ {
+ log.warn("Couldn't complete cleanup on paging", ex);
+ return;
+ }
+ finally
+ {
+ pagingStore.unlock();
+ }
}
+
+ try
+ {
+ for (Page depagedPage : depagedPages)
+ {
+ depagedPage.delete();
+ }
+ }
catch (Exception ex)
{
log.warn("Couldn't complete cleanup on paging", ex);
+ return;
}
+
}
public void printDebug()
@@ -343,15 +407,11 @@
// Private -------------------------------------------------------
- private long getMinPageInUse()
+ /**
+ * This method is synchronized because we want it to be atomic with the cursors being used
+ */
+ private long checkMinPage(List<PageCursor> cursorList)
{
- ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
- synchronized (this)
- {
- cursorList.addAll(activeCursors.values());
- cursorList.addAll(nonPersistentCursors);
- }
-
if (cursorList.size() == 0)
{
return 0l;
@@ -378,7 +438,7 @@
{
boolean needToRead = false;
PageCache cache = null;
- synchronized (this)
+ synchronized (softCache)
{
if (pageId > pagingStore.getCurrentWritingPage())
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -217,6 +217,16 @@
// PagingStore implementation ------------------------------------
+ public void lock()
+ {
+ writeLock.lock();
+ }
+
+ public void unlock()
+ {
+ writeLock.unlock();
+ }
+
public PageCursorProvider getCursorProvier()
{
return cursorProvider;
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -272,6 +272,7 @@
for (int i = 0; i < 1000; i++)
{
+ System.out.println("Reading Msg : " + i);
Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
More information about the hornetq-commits
mailing list