Author: clebert.suconic(a)jboss.com
Date: 2010-10-19 17:28:15 -0400 (Tue, 19 Oct 2010)
New Revision: 9795
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Implementing cleanup after closing a cursor
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-16
04:00:15 UTC (rev 9794)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-19
21:28:15 UTC (rev 9795)
@@ -29,8 +29,15 @@
// Cursor query operations --------------------------------------
+ // To be called before the server is down
void stop();
+ /** It will be 0 if non persistent cursor */
+ public long getId();
+
+ // To be called when the cursor is closed for good. Most likely when the queue is
deleted
+ void close() throws Exception;
+
Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-16
04:00:15 UTC (rev 9794)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-19
21:28:15 UTC (rev 9795)
@@ -66,6 +66,15 @@
void scheduleCleanup();
+ /**
+ * @param pageCursorImpl
+ */
+ void close(PageCursor pageCursorImpl);
+
+ // to be used on tests -------------------------------------------
+
+ int getCacheSize();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-19
21:28:15 UTC (rev 9795)
@@ -24,6 +24,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
@@ -107,6 +108,11 @@
// Public --------------------------------------------------------
+ public PageCursorProvider getProvider()
+ {
+ return this.cursorProvider;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
@@ -196,10 +202,9 @@
*/
public long getFirstPage()
{
- Long firstKey = consumedPages.firstKey();
- if (firstKey == null)
+ if (consumedPages.isEmpty())
{
- return Long.MAX_VALUE;
+ return 0;
}
else
{
@@ -221,7 +226,6 @@
*/
public void reloadACK(final PagePosition position)
{
- System.out.println("reloading " + position);
if (recoveredACK == null)
{
recoveredACK = new LinkedList<PagePosition>();
@@ -246,8 +250,95 @@
{
processACK(position);
}
+
+
+ /**
+ * All the data associated with the cursor should go away here
+ */
+ public void close() throws Exception
+ {
+ final long tx = store.generateUniqueID();
+
+ final ArrayList<Exception> ex = new ArrayList<Exception>();
+
+ final AtomicBoolean isPersistent = new AtomicBoolean(false);
+
+ // We can't delete the records at the caller's thread
+ // because an executor may be holding the synchronized on PageCursorImpl
+ // what would lead to a dead lock
+ // so, we delete it inside the executor also
+ // and wait for the result
+ // The caller will be treating eventual IO exceptions and dispatching to the
original thread's caller
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ synchronized (PageCursorImpl.this)
+ {
+ for (PageCursorInfo cursor : consumedPages.values())
+ {
+ for (PagePosition info : cursor.acks)
+ {
+ if (info.getRecordID() != 0)
+ {
+ isPersistent.set(true);
+ store.deleteCursorAcknowledgeTransactional(tx,
info.getRecordID());
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ ex.add(e);
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(5000))
+ {
+ log.warn("Timeout on waiting cursor " + this + " to be
closed");
+ }
+
+
+ if (isPersistent.get())
+ {
+ // Another reason to perform the commit at the main thread is because the
OperationContext may only send the result to the client when
+ // the IO on commit is done
+ if (ex.size() == 0)
+ {
+ store.commit(tx);
+ }
+ else
+ {
+ store.rollback(tx);
+ throw ex.get(0);
+ }
+ }
+
+ cursorProvider.close(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getId()
+ */
+ public long getId()
+ {
+ return cursorId;
+ }
+
+
+
public void processReload() throws Exception
{
if (recoveredACK != null)
@@ -317,7 +408,10 @@
{
Future future = new Future();
executor.execute(future);
- future.await(1000);
+ while (!future.await(1000))
+ {
+ log.warn("Waiting page cursor to finish executors - " + this);
+ }
}
public void printDebug()
@@ -340,7 +434,6 @@
if (pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
- System.out.println("Number of Messages = " +
cache.getNumberOfMessages());
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(),
cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
@@ -451,7 +544,7 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- System.out.println("We can't clear page " +
entry.getKey() + " now since it's the current page");
+ trace("We can't clear page " + entry.getKey() + "
now since it's the current page");
}
else
{
@@ -499,7 +592,7 @@
{
PageCursorImpl.trace("Removing page " +
completePage.getPageId());
}
- System.out.println("Removing page " +
completePage.getPageId());
+ trace("Removing page " + completePage.getPageId());
consumedPages.remove(completePage.getPageId());
}
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-19
21:28:15 UTC (rev 9795)
@@ -33,6 +33,7 @@
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ConcurrentSet;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -67,7 +68,7 @@
private SoftValueHashMap<Long, PageCache> softCache = new
SoftValueHashMap<Long, PageCache>();
private ConcurrentMap<Long, PageCursor> activeCursors = new
ConcurrentHashMap<Long, PageCursor>();
-
+
private ConcurrentSet<PageCursor> nonPersistentCursors = new
ConcurrentHashSet<PageCursor>();
// Static --------------------------------------------------------
@@ -83,7 +84,7 @@
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
- }
+ }
// Public --------------------------------------------------------
@@ -251,14 +252,42 @@
cursor.stop();
}
- activeCursors.clear();
+ for (PageCursor cursor : nonPersistentCursors)
+ {
+ cursor.stop();
+ }
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(10000))
+ {
+ log.warn("Waiting cursor provider " + this + " to finish
executors");
+ }
+
}
+ public void close(PageCursor cursor)
+ {
+ if (cursor.getId() != 0)
+ {
+ activeCursors.remove(cursor.getId());
+ }
+ else
+ {
+ nonPersistentCursors.remove(cursor);
+ }
+
+ scheduleCleanup();
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#scheduleCleanup()
*/
public void scheduleCleanup()
{
+
executor.execute(new Runnable()
{
public void run()
@@ -274,17 +303,18 @@
try
{
- System.out.println("MinPage = " + minPage + " firstPage = "
+ pagingStore.getFirstPage());
for (long i = pagingStore.getFirstPage(); i < minPage; i++)
{
Page page = pagingStore.depage();
- System.out.println("Deleting files associated with page " + page);
- page.delete();
+ if (page != null)
+ {
+ page.delete();
+ }
}
}
- catch (Exception e)
+ catch (Exception ex)
{
- log.warn("Couldn't complete cleanup on paging", e);
+ log.warn("Couldn't complete cleanup on paging", ex);
}
}
@@ -316,15 +346,13 @@
cursorList.addAll(activeCursors.values());
cursorList.addAll(nonPersistentCursors);
}
-
+
if (cursorList.size() == 0)
{
return 0l;
}
long minPage = Long.MAX_VALUE;
-
- System.out.println("CursorList : " + cursorList.size());
for (PageCursor cursor : cursorList)
{
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-16
04:00:15 UTC (rev 9794)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-19
21:28:15 UTC (rev 9795)
@@ -56,6 +56,7 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
/**
*
@@ -210,6 +211,11 @@
}
// Public --------------------------------------------------------
+
+ public String toString()
+ {
+ return "PagingStoreImpl(" + this.address + ")";
+ }
// PagingStore implementation ------------------------------------
@@ -393,30 +399,26 @@
{
if (running)
{
+
+ cursorProvider.stop();
+
running = false;
- final CountDownLatch latch = new CountDownLatch(1);
+ Future future = new Future();
- executor.execute(new Runnable()
- {
- public void run()
- {
- latch.countDown();
- }
- });
+ executor.execute(future);
- if (!latch.await(60, TimeUnit.SECONDS))
+ if (!future.await(60000))
{
PagingStoreImpl.log.warn("Timed out on waiting PagingStore " +
address + " to shutdown");
}
+
if (currentPage != null)
{
currentPage.close();
currentPage = null;
}
-
- cursorProvider.stop();
}
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-16
04:00:15 UTC (rev 9794)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-19
21:28:15 UTC (rev 9795)
@@ -470,6 +470,51 @@
// Validate the pages are being cleared (with multiple cursors)
}
+
+ public void testCloseNonPersistentConsumer() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupPageStore(ADDRESS).getCursorProvier();
+
+ PageCursor cursor = cursorProvider.createNonPersistentCursor();
+ PageCursorImpl cursor2 =
(PageCursorImpl)cursorProvider.createNonPersistentCursor();
+
+ Pair<PagePosition, PagedMessage> msg;
+
+ int key = 0;
+ while ((msg = cursor.moveNext()) != null)
+ {
+ assertEquals(key++,
msg.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ msg = cursor2.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ }
+
+ assertSame(cursor2.getProvider(), cursorProvider);
+
+ cursor2.close();
+
+ server.stop();
+
+ }
+
+
public void testLeavePageStateAndRestart() throws Exception
{
// Validate the cursor are working fine when all the pages are gone, and then
paging being restarted