[hornetq-commits] JBoss hornetq SVN: r9779 - in branches/Branch_New_Paging: src/main/org/hornetq/core/persistence and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 12 14:17:08 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-10-12 14:17:07 -0400 (Tue, 12 Oct 2010)
New Revision: 9779
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Implementing cleanup after a full page was consumed
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-12 18:17:07 UTC (rev 9779)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor.impl;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -26,6 +27,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -35,7 +37,9 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.impl.TransactionImpl;
/**
* A PageCursorImpl
@@ -48,6 +52,7 @@
public class PageCursorImpl implements PageCursor
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageCursorImpl.class);
// Attributes ----------------------------------------------------
@@ -58,14 +63,14 @@
private final PagingStore pageStore;
private final PageCursorProvider cursorProvider;
-
+
private final Executor executor;
private volatile PagePosition lastPosition;
private List<PagePosition> recoveredACK;
- private SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+ private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -112,7 +117,7 @@
boolean match = false;
Pair<PagePosition, ServerMessage> message = null;
-
+
do
{
message = cursorProvider.getAfter(lastPosition);
@@ -128,7 +133,7 @@
processACK(message.a);
}
}
-
+
}
while (message != null && !match);
@@ -140,14 +145,20 @@
*/
public void ack(final PagePosition position) throws Exception
{
- store.storeCursorAcknowledge(cursorId, position);
+
+ // if we are dealing with a persistent cursor
+ if (cursorId != 0)
+ {
+ store.storeCursorAcknowledge(cursorId, position);
+ }
+
store.afterCompleteOperations(new IOAsyncTask()
{
-
- public void onError(int errorCode, String errorMessage)
+
+ public void onError(final int errorCode, final String errorMessage)
{
}
-
+
public void done()
{
processACK(position);
@@ -157,7 +168,11 @@
public void ackTx(final Transaction tx, final PagePosition position) throws Exception
{
- store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ // if the cursor is persistent
+ if (cursorId != 0)
+ {
+ store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ }
installTXCallback(tx, position);
}
@@ -167,7 +182,7 @@
*/
public synchronized void redeliver(final PagePosition position)
{
- this.redeliveries.add(position);
+ redeliveries.add(position);
}
/**
@@ -175,8 +190,12 @@
*/
public void reloadACK(final PagePosition position)
{
- internalAdd(position);
+ if (recoveredACK == null)
+ {
+ recoveredACK = new LinkedList<PagePosition>();
+ }
+ recoveredACK.add(position);
}
/* (non-Javadoc)
@@ -190,7 +209,7 @@
public void processReload() throws Exception
{
- if (this.recoveredACK != null)
+ if (recoveredACK != null)
{
System.out.println("********** processing reload!!!!!!!");
Collections.sort(recoveredACK);
@@ -199,7 +218,7 @@
for (PagePosition pos : recoveredACK)
{
PageCursorInfo positions = getPageInfo(pos);
-
+
positions.addACK(pos);
lastPosition = pos;
@@ -230,7 +249,8 @@
else
{
// The reference was ignored. But we must take a count from the reference count
- // otherwise the page will never be deleted hence we would never leave paging even if everything was consumed
+ // otherwise the page will never be deleted hence we would never leave paging even if
+ // everything was consumed
positions.confirmed.incrementAndGet();
}
}
@@ -252,25 +272,23 @@
* @param page
* @return
*/
- private PageCursorInfo getPageInfo(PagePosition pos)
+ private PageCursorInfo getPageInfo(final PagePosition pos)
{
PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
-
+
if (pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
consumedPages.put(pos.getPageNr(), pageInfo);
}
-
+
return pageInfo;
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
-
protected boolean match(final ServerMessage message)
{
@@ -279,88 +297,149 @@
}
// Private -------------------------------------------------------
-
+
// To be called only after the ACK has been processed and guaranteed to be on storae
// The only exception is on non storage events such as not matching messages
private void processACK(final PagePosition pos)
{
PageCursorInfo info = getPageInfo(pos);
-
+
info.addACK(pos);
}
/**
- * @param committedACK
+ * @param tx
+ * @param position
*/
- private void internalAdd(final PagePosition committedACK)
+ private void installTXCallback(final Transaction tx, final PagePosition position)
{
- if (recoveredACK == null)
+ if (position.getRecordID() > 0)
{
- recoveredACK = new LinkedList<PagePosition>();
+ // It needs to persist, otherwise the cursor will return to the fist page position
+ tx.setContainsPersistent();
}
- recoveredACK.add(committedACK);
- }
+ PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
- /**
- * @param tx
- * @param position
- */
- private void installTXCallback(Transaction tx, PagePosition position)
- {
- // It needs to persist, otherwise the cursor will return to the fist page position
- tx.setContainsPersistent();
-
- PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
-
if (cursorTX == null)
{
cursorTX = new PageCursorTX();
- tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS,cursorTX);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
tx.addOperation(cursorTX);
}
-
+
cursorTX.addPositionConfirmation(this, position);
-
-
+
}
-
- // A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
- private void onPageDone(PageCursorInfo info)
+
+ /**
+ * A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
+ * @param info
+ */
+ private void onPageDone(final PageCursorInfo info)
{
System.out.println("Page " + info.getPageId() + " has completed");
+
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ cleanupPages();
+ }
+ catch (Exception e)
+ {
+ PageCursorImpl.log.warn("Error on cleaning up cursor pages");
+ }
+ }
+ });
}
+ /**
+ * It will cleanup all the records for completed pages
+ * */
+ private void cleanupPages() throws Exception
+ {
+ Transaction tx = new TransactionImpl(store);
+
+ boolean persist = false;
+
+ final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
+
+ // First get the completed pages
+ synchronized (this)
+ {
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+ {
+ if (entry.getValue().isDone())
+ {
+ completedPages.add(entry.getValue());
+ }
+ }
+ }
+
+ for (PageCursorInfo info : completedPages)
+ {
+ for (PagePosition pos : info.acks)
+ {
+ if (pos.getRecordID() > 0)
+ {
+ store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+ if (!persist)
+ {
+ // only need to set it once
+ tx.setContainsPersistent();
+ persist = true;
+ }
+ }
+ }
+ }
+
+ tx.addOperation(new TransactionOperationAbstract()
+ {
+
+ @Override
+ public void afterCommit(final Transaction tx)
+ {
+ synchronized (PageCursorImpl.this)
+ {
+ for (PageCursorInfo completePage : completedPages)
+ {
+ System.out.println("Removing page " + completePage.getPageId());
+ consumedPages.remove(completePage.getPageId());
+ }
+ }
+ }
+ });
+
+ tx.commit();
+
+ }
+
// Inner classes -------------------------------------------------
-
-
+
private class PageCursorInfo
{
// Number of messages existent on this page
private final int numberOfMessages;
-
+
private final long pageId;
-
+
// Confirmed ACKs on this page
private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
-
- // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or expressions
+
+ // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
+ // expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
-
+
public PageCursorInfo(final long pageId, final int numberOfMessages)
{
this.pageId = pageId;
this.numberOfMessages = numberOfMessages;
}
- /**
- * @return the numberOfMessages
- */
- public int getNumberOfMessages()
- {
- return numberOfMessages;
- }
-
public boolean isDone()
{
return numberOfMessages == confirmed.get();
@@ -373,91 +452,91 @@
{
return pageId;
}
-
+
public void addACK(final PagePosition posACK)
{
if (posACK.getRecordID() > 0)
{
// We store these elements for later cleanup
- this.acks.add(posACK);
+ acks.add(posACK);
}
-
+
if (numberOfMessages == confirmed.incrementAndGet())
{
- PageCursorImpl.this.onPageDone(this);
+ onPageDone(this);
}
}
-
- }
-
+
+ }
+
static class PageCursorTX implements TransactionOperation
{
HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
-
- public void addPositionConfirmation(PageCursorImpl cursor, PagePosition position)
+
+ public void addPositionConfirmation(final PageCursorImpl cursor, final PagePosition position)
{
List<PagePosition> list = pendingPositions.get(cursor);
-
+
if (list == null)
{
list = new LinkedList<PagePosition>();
pendingPositions.put(cursor, list);
}
-
+
list.add(position);
}
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
*/
- public void beforePrepare(Transaction tx) throws Exception
+ public void beforePrepare(final Transaction tx) throws Exception
{
}
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
*/
- public void afterPrepare(Transaction tx)
+ public void afterPrepare(final Transaction tx)
{
}
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
*/
- public void beforeCommit(Transaction tx) throws Exception
+ public void beforeCommit(final Transaction tx) throws Exception
{
}
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
*/
- public void afterCommit(Transaction tx)
+ public void afterCommit(final Transaction tx)
{
- for (Entry<PageCursorImpl, List<PagePosition>> entry : this.pendingPositions.entrySet())
+ for (Entry<PageCursorImpl, List<PagePosition>> entry : pendingPositions.entrySet())
{
PageCursorImpl cursor = entry.getKey();
-
+
List<PagePosition> positions = entry.getValue();
-
+
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
}
-
+
}
}
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
*/
- public void beforeRollback(Transaction tx) throws Exception
+ public void beforeRollback(final Transaction tx) throws Exception
{
}
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
*/
- public void afterRollback(Transaction tx)
+ public void afterRollback(final Transaction tx)
{
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-12 18:17:07 UTC (rev 9779)
@@ -116,6 +116,8 @@
void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception;
void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception;
+
+ void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception;
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-12 18:17:07 UTC (rev 9779)
@@ -631,7 +631,17 @@
position.setRecordID(ackID);
messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
+ */
+ public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, ackID);
+ }
+
+
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
{
long id = generateUniqueID();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-12 18:17:07 UTC (rev 9779)
@@ -470,4 +470,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
+ */
+ public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java 2010-10-12 18:17:07 UTC (rev 9779)
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.transaction;
+
+/**
+ * Just a helper, when you don't want to implement all the methods on a transaction operation.
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public abstract class TransactionOperationAbstract implements TransactionOperation
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+
+ }
+
+ /** After prepare shouldn't throw any exception. Any verification has to be done on before prepare */
+ public void afterPrepare(Transaction tx)
+ {
+
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /** After commit shouldn't throw any exception. Any verification has to be done on before commit */
+ public void afterCommit(Transaction tx)
+ {
+ };
+
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ };
+
+ /** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
+ public void afterRollback(Transaction tx)
+ {
+ };
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-12 18:17:07 UTC (rev 9779)
@@ -1543,6 +1543,15 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
+ */
+ public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
More information about the hornetq-commits
mailing list