Author: clebert.suconic(a)jboss.com
Date: 2010-11-10 21:09:12 -0500 (Wed, 10 Nov 2010)
New Revision: 9870
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
update page transactions
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -42,12 +42,16 @@
void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx)
throws Exception;
- void storeUpdate(StorageManager storageManager, PagingManager pagingManager,
Transaction tx, int depages) throws Exception;
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager,
Transaction tx) throws Exception;
+
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws
Exception;
// To be used after the update was stored or reload
void onUpdate(int update, StorageManager storageManager, PagingManager
pagingManager);
void increment();
+
+ void increment(int size);
int getNumberOfMessages();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -31,6 +31,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -164,133 +165,115 @@
ack(position);
}
- class CursorIterator implements LinkedListIterator<PagedReference>
+ public void scheduleCleanupCheck()
{
- private PagePosition position = null;
-
- private PagePosition lastOperation = null;
-
- private final LinkedListIterator<PagePosition> redeliveryIterator;
-
- private volatile boolean isredelivery = false;
-
- /** next element taken on hasNext test.
- * it has to be delivered on next next operation */
- private volatile PagedReference cachedNext;
-
- public CursorIterator()
+ if (autoCleanup)
{
- synchronized (redeliveries)
+ executor.execute(new Runnable()
{
- redeliveryIterator = redeliveries.iterator();
- }
- }
-
- public void repeat()
- {
- if (isredelivery)
- {
- synchronized (redeliveries)
+ public void run()
{
- redeliveryIterator.repeat();
+ try
+ {
+ cleanupEntries();
+ }
+ catch (Exception e)
+ {
+ PageSubscriptionImpl.log.warn("Error on cleaning up cursor
pages", e);
+ }
}
- }
- else
- {
- if (lastOperation == null)
- {
- position = null;
- }
- else
- {
- position = lastOperation;
- }
- }
+ });
}
+ }
- /* (non-Javadoc)
- * @see java.util.Iterator#next()
- */
- public synchronized PagedReference next()
+ /**
+ * It will cleanup all the records for completed pages
+ * */
+ public void cleanupEntries() throws Exception
+ {
+ Transaction tx = new TransactionImpl(store);
+
+ boolean persist = false;
+
+ final ArrayList<PageCursorInfo> completedPages = new
ArrayList<PageCursorInfo>();
+
+ // First get the completed pages using a lock
+ synchronized (this)
{
-
- if (cachedNext != null)
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
- PagedReference retPos = cachedNext;
- cachedNext = null;
- return retPos;
- }
-
- try
- {
- synchronized (redeliveries)
+ PageCursorInfo info = entry.getValue();
+ if (info.isDone() && !info.isPendingDelete() &&
lastAckedPosition != null)
{
- if (redeliveryIterator.hasNext())
+ if (entry.getKey() == lastAckedPosition.getPageNr())
{
- // There's a redelivery pending, we will get it out of that pool
instead
- isredelivery = true;
- return getReference(redeliveryIterator.next());
+ PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() + " now since it's the current page");
}
else
{
- isredelivery = false;
+ info.setPendingDelete();
+ completedPages.add(entry.getValue());
}
}
-
- if (position == null)
- {
- position = getStartPosition();
- }
+ }
+ }
- PagedReference nextPos = moveNext(position);
- if (nextPos != null)
+ for (int i = 0; i < completedPages.size(); i++)
+ {
+ PageCursorInfo info = completedPages.get(i);
+
+ for (PagePosition pos : info.acks)
+ {
+ if (pos.getRecordID() > 0)
{
- lastOperation = position;
- position = nextPos.getPosition();
+ store.deleteCursorAcknowledgeTransactional(tx.getID(),
pos.getRecordID());
+ if (!persist)
+ {
+ // only need to set it once
+ tx.setContainsPersistent();
+ persist = true;
+ }
}
- return nextPos;
}
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
}
- /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
- * It would be a rare race condition but I would prefer avoiding that scenario */
- public synchronized boolean hasNext()
+ tx.addOperation(new TransactionOperationAbstract()
{
- // if an unbehaved program called hasNext twice before next, we only cache it
once.
- if (cachedNext != null)
+
+ @Override
+ public void afterCommit(final Transaction tx)
{
- return true;
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ synchronized (PageSubscriptionImpl.this)
+ {
+ for (PageCursorInfo completePage : completedPages)
+ {
+ if (isTrace)
+ {
+ PageSubscriptionImpl.trace("Removing page " +
completePage.getPageId());
+ }
+ if (consumedPages.remove(completePage.getPageId()) == null)
+ {
+ PageSubscriptionImpl.log.warn("Couldn't remove page
" + completePage.getPageId() +
+ " from consumed pages on cursor
for address " +
+ pageStore.getAddress());
+ }
+ }
+ }
+
+ cursorProvider.scheduleCleanup();
+ }
+ });
}
-
- if (!pageStore.isPaging())
- {
- return false;
- }
-
- cachedNext = next();
+ });
- return cachedNext != null;
- }
+ tx.commit();
- /* (non-Javadoc)
- * @see java.util.Iterator#remove()
- */
- public void remove()
- {
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.utils.LinkedListIterator#close()
- */
- public void close()
- {
- }
}
private PagedReference getReference(PagePosition pos) throws Exception
@@ -396,12 +379,41 @@
return new PagePositionImpl(pageStore.getFirstPage(), -1);
}
+ public void ackTx(final Transaction tx, final PagePosition position) throws Exception
+ {
+ // if the cursor is persistent
+ if (persistent)
+ {
+ store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ }
+ installTXCallback(tx, position);
+
+ }
+
+
+ public void ackTx(final Transaction tx, final PagedReference reference) throws
Exception
+ {
+ ackTx(tx, reference.getPosition());
+
+ PageTransactionInfo txInfo = getPageTransaction(reference);
+ if (txInfo != null)
+ {
+ txInfo.storeUpdate(store, pageStore.getPagingManager(), tx);
+ }
+ }
+
+
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void ack(final PagedReference position) throws Exception
+ public void ack(final PagedReference reference) throws Exception
{
- ack(position.getPosition());
+ ack(reference.getPosition());
+ PageTransactionInfo txInfo = getPageTransaction(reference);
+ if (txInfo != null)
+ {
+ txInfo.storeUpdate(this.store, pageStore.getPagingManager());
+ }
}
public void ack(final PagePosition position) throws Exception
@@ -426,23 +438,6 @@
});
}
- public void ackTx(final Transaction tx, final PagePosition position) throws Exception
- {
- // if the cursor is persistent
- if (persistent)
- {
- store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
- }
- installTXCallback(tx, position);
-
- }
-
-
- public void ackTx(final Transaction tx, final PagedReference position) throws
Exception
- {
- ackTx(tx, position.getPosition());
- }
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
*/
@@ -743,6 +738,18 @@
cursorTX.addPositionConfirmation(this, position);
}
+
+ private PageTransactionInfo getPageTransaction(final PagedReference reference)
+ {
+ if (reference.getPagedMessage().getTransactionID() != 0)
+ {
+ return
pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
+ }
+ else
+ {
+ return null;
+ }
+ }
/**
* A callback from the PageCursorInfo. It will be called when all the messages on a
page have been acked
@@ -755,118 +762,6 @@
scheduleCleanupCheck();
}
}
-
- public void scheduleCleanupCheck()
- {
- if (autoCleanup)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- try
- {
- cleanupEntries();
- }
- catch (Exception e)
- {
- PageSubscriptionImpl.log.warn("Error on cleaning up cursor
pages", e);
- }
- }
- });
- }
- }
-
- /**
- * It will cleanup all the records for completed pages
- * */
- public void cleanupEntries() throws Exception
- {
- Transaction tx = new TransactionImpl(store);
-
- boolean persist = false;
-
- final ArrayList<PageCursorInfo> completedPages = new
ArrayList<PageCursorInfo>();
-
- // First get the completed pages using a lock
- synchronized (this)
- {
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
- {
- PageCursorInfo info = entry.getValue();
- if (info.isDone() && !info.isPendingDelete() &&
lastAckedPosition != null)
- {
- if (entry.getKey() == lastAckedPosition.getPageNr())
- {
- PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() + " now since it's the current page");
- }
- else
- {
- info.setPendingDelete();
- completedPages.add(entry.getValue());
- }
- }
- }
- }
-
- for (int i = 0; i < completedPages.size(); i++)
- {
- PageCursorInfo info = completedPages.get(i);
-
- for (PagePosition pos : info.acks)
- {
- if (pos.getRecordID() > 0)
- {
- store.deleteCursorAcknowledgeTransactional(tx.getID(),
pos.getRecordID());
- if (!persist)
- {
- // only need to set it once
- tx.setContainsPersistent();
- persist = true;
- }
- }
- }
- }
-
- tx.addOperation(new TransactionOperationAbstract()
- {
-
- @Override
- public void afterCommit(final Transaction tx)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- synchronized (PageSubscriptionImpl.this)
- {
- for (PageCursorInfo completePage : completedPages)
- {
- if (isTrace)
- {
- PageSubscriptionImpl.trace("Removing page " +
completePage.getPageId());
- }
- if (consumedPages.remove(completePage.getPageId()) == null)
- {
- PageSubscriptionImpl.log.warn("Couldn't remove page
" + completePage.getPageId() +
- " from consumed pages on cursor
for address " +
- pageStore.getAddress());
- }
- }
- }
-
- cursorProvider.scheduleCleanup();
- }
- });
- }
- });
-
- tx.commit();
-
- }
-
// Inner classes -------------------------------------------------
/**
@@ -1038,6 +933,137 @@
}
}
+ }
+
+ class CursorIterator implements LinkedListIterator<PagedReference>
+ {
+ private PagePosition position = null;
+
+ private PagePosition lastOperation = null;
+
+ private final LinkedListIterator<PagePosition> redeliveryIterator;
+
+ private volatile boolean isredelivery = false;
+
+ /** next element taken on hasNext test.
+ * it has to be delivered on next next operation */
+ private volatile PagedReference cachedNext;
+
+ public CursorIterator()
+ {
+ synchronized (redeliveries)
+ {
+ redeliveryIterator = redeliveries.iterator();
+ }
+ }
+
+
+ public void repeat()
+ {
+ if (isredelivery)
+ {
+ synchronized (redeliveries)
+ {
+ redeliveryIterator.repeat();
+ }
+ }
+ else
+ {
+ if (lastOperation == null)
+ {
+ position = null;
+ }
+ else
+ {
+ position = lastOperation;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ public synchronized PagedReference next()
+ {
+
+ if (cachedNext != null)
+ {
+ PagedReference retPos = cachedNext;
+ cachedNext = null;
+ return retPos;
+ }
+
+ try
+ {
+ synchronized (redeliveries)
+ {
+ if (redeliveryIterator.hasNext())
+ {
+ // There's a redelivery pending, we will get it out of that pool
instead
+ isredelivery = true;
+ return getReference(redeliveryIterator.next());
+ }
+ else
+ {
+ isredelivery = false;
+ }
+ }
+
+ if (position == null)
+ {
+ position = getStartPosition();
+ }
+
+ PagedReference nextPos = moveNext(position);
+ if (nextPos != null)
+ {
+ lastOperation = position;
+ position = nextPos.getPosition();
+ }
+ return nextPos;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
+ * It would be a rare race condition but I would prefer avoiding that scenario */
+ public synchronized boolean hasNext()
+ {
+ // if an unbehaved program called hasNext twice before next, we only cache it
once.
+ if (cachedNext != null)
+ {
+ return true;
+ }
+
+ if (!pageStore.isPaging())
+ {
+ return false;
+ }
+
+ cachedNext = next();
+
+ return cachedNext != null;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ public void remove()
+ {
+ PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.LinkedListIterator#close()
+ */
+ public void close()
+ {
+ }
}
+
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -13,12 +13,15 @@
package org.hornetq.core.paging.impl;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
@@ -27,6 +30,8 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.DataConstants;
/**
@@ -98,10 +103,7 @@
{
log.warn("Can't delete page transaction id=" + this.recordID);
}
- }
-
- if (sizeAfterUpdate == 0 && pagingManager != null)
- {
+
pagingManager.removeTransaction(this.transactionID);
}
}
@@ -110,6 +112,11 @@
{
numberOfMessages.incrementAndGet();
}
+
+ public void increment(final int size)
+ {
+ numberOfMessages.addAndGet(size);
+ }
public int getNumberOfMessages()
{
@@ -158,40 +165,37 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager,
org.hornetq.core.transaction.Transaction, int)
*/
- public void storeUpdate(final StorageManager storageManager, final PagingManager
pagingManager, final Transaction tx, final int depages) throws Exception
+ public void storeUpdate(final StorageManager storageManager, final PagingManager
pagingManager, final Transaction tx) throws Exception
{
- storageManager.updatePageTransaction(tx.getID(), this, depages);
+ UpdatePageTXOperation pgtxUpdate =
(UpdatePageTXOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE);
- final PageTransactionInfo pgToUpdate = this;
+ if (pgtxUpdate == null)
+ {
+ pgtxUpdate = new UpdatePageTXOperation(storageManager, pagingManager);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE, pgtxUpdate);
+ tx.addOperation(pgtxUpdate);
+ }
- tx.addOperation(new TransactionOperation()
+ pgtxUpdate.addUpdate(this);
+ }
+
+ public void storeUpdate(final StorageManager storageManager, final PagingManager
pagingManager) throws Exception
+ {
+ storageManager.updatePageTransaction(this, 1);
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- public void beforeRollback(Transaction tx) throws Exception
+ public void onError(int errorCode, String errorMessage)
{
}
- public void beforePrepare(Transaction tx) throws Exception
+ public void done()
{
+ PageTransactionInfoImpl.this.onUpdate(1, storageManager, pagingManager);
}
-
- public void beforeCommit(Transaction tx) throws Exception
- {
- }
-
- public void afterRollback(Transaction tx)
- {
- }
-
- public void afterPrepare(Transaction tx)
- {
- }
-
- public void afterCommit(Transaction tx)
- {
- pgToUpdate.onUpdate(depages, storageManager, pagingManager);
- }
});
}
+
+
public boolean isCommit()
{
@@ -260,4 +264,68 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+
+ static class UpdatePageTXOperation extends TransactionOperationAbstract
+ {
+ private HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new
HashMap<PageTransactionInfo, AtomicInteger>();
+
+ private boolean stored = false;
+
+ private final StorageManager storageManager;
+
+ private final PagingManager pagingManager;
+
+ public UpdatePageTXOperation(final StorageManager storageManager, final
PagingManager pagingManager)
+ {
+ this.storageManager = storageManager;
+ this.pagingManager = pagingManager;
+ }
+
+ public void addUpdate(PageTransactionInfo info)
+ {
+ AtomicInteger counter = countsToUpdate.get(info);
+
+ if (counter == null)
+ {
+ counter = new AtomicInteger(0);
+ countsToUpdate.put(info, counter);
+ }
+
+ counter.incrementAndGet();
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ storeUpdates(tx);
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ storeUpdates(tx);
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry :
countsToUpdate.entrySet())
+ {
+ entry.getKey().onUpdate(entry.getValue().intValue(), storageManager,
pagingManager);
+ }
+ }
+
+ private void storeUpdates(Transaction tx) throws Exception
+ {
+ if (!stored)
+ {
+ stored = true;
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry :
countsToUpdate.entrySet())
+ {
+ storageManager.updatePageTransaction(tx.getID(), entry.getKey(),
entry.getValue().get());
+ }
+ }
+ }
+
+
+
+ }
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -14,7 +14,6 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,8 +50,8 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ExecutorFactory;
@@ -700,59 +699,6 @@
* @return
* @throws Exception
*/
- protected boolean readPage() throws Exception
- {
- Page page = depage();
-
- // It's important that only depage should happen while locked
- // or we would be holding a lock for a long time
- // The reading (IO part) should happen outside of any locks
-
- if (page == null)
- {
- return false;
- }
-
- page.open();
-
- List<PagedMessage> messages = null;
-
- try
- {
- messages = page.read();
- }
- finally
- {
- try
- {
- page.close();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- if (onDepage(page.getPageId(), storeName, messages))
- {
- if (page.delete())
- {
- // DuplicateCache could be null during replication
- // however the deletes on the journal will happen through replicated journal
- if (duplicateCache != null)
- {
- duplicateCache.deleteFromCache(generateDuplicateID(page.getPageId()));
- }
- }
-
- return true;
- }
- else
- {
- return false;
- }
-
- }
-
private Queue<OurRunnable> onMemoryFreedRunnables = new
ConcurrentLinkedQueue<OurRunnable>();
private class MemoryFreedRunnablesExecutor implements Runnable
@@ -951,7 +897,7 @@
Transaction tx = ctx.getTransaction();
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx),
getTransactionID(tx));
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx),
getTransactionID(tx, listCtx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -991,7 +937,7 @@
return ids;
}
- private long getTransactionID(Transaction tx) throws Exception
+ private long getTransactionID(final Transaction tx, final RouteContextList listCtx)
throws Exception
{
if (tx == null)
{
@@ -999,34 +945,42 @@
}
else
{
- if (tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION) == null)
+ PageTransactionInfo pgTX = (PageTransactionInfo)
tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ if (pgTX == null)
{
- PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+ pgTX = new PageTransactionInfoImpl(tx.getID());
System.out.println("Creating pageTransaction " +
pgTX.getTransactionID());
- storageManager.storePageTransaction(tx.getID(), pgTX);
pagingManager.addTransaction(pgTX);
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
- tx.addOperation(new FinishPageMessageOperation());
+ tx.addOperation(new FinishPageMessageOperation(pgTX));
tx.setContainsPersistent();
}
+ pgTX.increment(listCtx.getNumberOfQueues());
+
return tx.getID();
}
}
- private static class FinishPageMessageOperation implements TransactionOperation
+ private class FinishPageMessageOperation implements TransactionOperation
{
+ private final PageTransactionInfo pageTransaction;
+
+ private boolean stored = false;
+ public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+ {
+ this.pageTransaction = pageTransaction;
+ }
+
public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we
can't let depage start for the
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
- PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
if (pageTransaction != null)
{
pageTransaction.commit();
@@ -1039,8 +993,6 @@
public void afterRollback(final Transaction tx)
{
- PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
if (tx.getState() == State.PREPARED && pageTransaction != null)
{
pageTransaction.rollback();
@@ -1049,11 +1001,22 @@
public void beforeCommit(final Transaction tx) throws Exception
{
+ storePageTX(tx);
}
public void beforePrepare(final Transaction tx) throws Exception
{
+ storePageTX(tx);
}
+
+ private void storePageTX(final Transaction tx) throws Exception
+ {
+ if (!stored)
+ {
+ storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ stored = true;
+ }
+ }
public void beforeRollback(final Transaction tx) throws Exception
{
@@ -1067,157 +1030,7 @@
* If persistent messages are also used, it will update eventual PageTransactions
*/
- private boolean onDepage(final int pageId, final SimpleString address, final
List<PagedMessage> pagedMessages) throws Exception
- {
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Depaging....");
- }
-
- if (pagedMessages.size() == 0)
- {
- // nothing to be done on this case.
- return true;
- }
-
- // Depage has to be done atomically, in case of failure it should be
- // back to where it was
-
- byte[] duplicateIdForPage = generateDuplicateID(pageId);
-
- Transaction depageTransaction = new TransactionImpl(storageManager);
-
- // DuplicateCache could be null during replication
- if (duplicateCache != null)
- {
- if (duplicateCache.contains(duplicateIdForPage))
- {
- log.warn("Page " + pageId +
- " had been processed already but the file wasn't removed as
a crash happened. Ignoring this page");
- return true;
- }
-
- duplicateCache.addToCache(duplicateIdForPage, depageTransaction);
- }
-
- depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE,
Boolean.valueOf(true));
-
- HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new
HashMap<PageTransactionInfo, AtomicInteger>();
-
- for (PagedMessage pagedMessage : pagedMessages)
- {
- ServerMessage message = pagedMessage.getMessage();
-
- if (message.isLargeMessage())
- {
- LargeServerMessage largeMsg = (LargeServerMessage)message;
- if (!largeMsg.isFileExists())
- {
- PagingStoreImpl.log.warn("File for large message " +
largeMsg.getMessageID() +
- " doesn't exist, so ignoring depage for
this large message");
- continue;
- }
- }
-
- final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
- PageTransactionInfo pageUserTransaction = null;
- AtomicInteger countPageTX = null;
-
- if (transactionIdDuringPaging >= 0)
- {
- pageUserTransaction =
pagingManager.getTransaction(transactionIdDuringPaging);
-
- if (pageUserTransaction == null)
- {
- // This is not supposed to happen
- PagingStoreImpl.log.warn("Transaction " +
pagedMessage.getTransactionID() +
- " used during paging not found");
- continue;
- }
- else
- {
- countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
- if (countPageTX == null)
- {
- countPageTX = new AtomicInteger();
- pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
- }
-
- // This is to avoid a race condition where messages are depaged
- // before the commit arrived
-
- while (running)
- {
- // This is just to give us a chance to interrupt the process..
- // if we start a shutdown in the middle of transactions, the
commit/rollback may never come, delaying
- // the shutdown of the server
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Waiting pageTransaction to
complete");
- }
- }
-
- if (!running)
- {
- break;
- }
-
- if (!pageUserTransaction.isCommit())
- {
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Rollback was called after prepare,
ignoring message " + message);
- }
- continue;
- }
- }
-
- }
-
- postOffice.route(message, depageTransaction, false);
-
- // This means the page is duplicated. So we need to ignore this
- if (depageTransaction.getState() == State.ROLLBACK_ONLY)
- {
- break;
- }
-
- // Update information about transactions
- // This needs to be done after routing because of duplication detection
- if (pageUserTransaction != null && message.isDurable())
- {
- countPageTX.incrementAndGet();
- }
- }
-
- if (!running)
- {
- depageTransaction.rollback();
- return false;
- }
-
- for (Map.Entry<PageTransactionInfo, AtomicInteger> entry :
pageTransactionsToUpdate.entrySet())
- {
- // This will set the journal transaction to commit;
- depageTransaction.setContainsPersistent();
-
- entry.getKey().storeUpdate(storageManager, this.pagingManager,
depageTransaction, entry.getValue().intValue());
- }
-
- depageTransaction.commit();
-
- storageManager.waitOnOperations();
-
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Depage committed, running = " + running);
- }
-
- return true;
- }
-
- /**
+ /**
* @param pageId
* @return
*/
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -142,6 +142,8 @@
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws
Exception;
void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int
depage) throws Exception;
+
+ void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws
Exception;
void deletePageTransactional(long recordID) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -606,6 +606,16 @@
depages));
}
+ public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int
depages) throws Exception
+ {
+ messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
+ }
+
public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -479,4 +479,13 @@
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo,
int)
+ */
+ public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage)
throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -24,6 +24,8 @@
*/
public interface RouteContextList
{
+
+ int getNumberOfQueues();
List<Queue> getDurableQueues();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -1272,7 +1272,8 @@
if (msgsToDeliver > 0)
{
- System.out.println("Depaging " + msgsToDeliver + "
messages");
+ //System.out.println("Depaging " + msgsToDeliver + "
messages");
+ System.out.println("Depage " + msgsToDeliver + " now.. there are
msgRef = " + messageReferences.size() + " scheduled = " +
getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
int nmessages = 0;
while (nmessages < msgsToDeliver && pageIterator.hasNext())
@@ -1281,7 +1282,13 @@
addTail(pageIterator.next(), false);
pageIterator.remove();
}
+
+ System.out.println("Depaged " + nmessages);
}
+ else
+ {
+ System.out.println("Depaging not being done now.. there are msgRef = "
+ messageReferences.size() + " scheduled = " + getScheduledCount() + "
concurrentQueue.size() = " + concurrentQueue.size());
+ }
deliverAsync();
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -124,6 +124,11 @@
private List<Queue> durableQueue = new ArrayList<Queue>(1);
private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
+
+ public int getNumberOfQueues()
+ {
+ return durableQueue.size() + nonDurableQueue.size();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.RouteContextList#getDurableQueues()
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -24,8 +24,9 @@
*/
public class TransactionPropertyIndexes
{
- public static final int IS_DEPAGE = 3;
+ public static final int PAGE_TRANSACTION_UPDATE = 4;
+
public static final int PAGE_TRANSACTION = 5;
public static final int REFS_OPERATION = 6;
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -1012,37 +1012,6 @@
syncNonTransactional);
}
- protected boolean page(ServerMessage message,
org.hornetq.core.server.RoutingContext ctx, org.hornetq.core.server.RouteContextList
listCtx, boolean sync) throws Exception
- {
- boolean paged = super.page(message, ctx, listCtx, sync);
-
- if (paged)
- {
-
- if (countDepage.incrementAndGet() == 1)
- {
- countDepage.set(0);
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- while (isStarted() && readPage());
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- });
- }
- }
-
- return paged;
- }
-
public boolean startDepaging()
{
// do nothing, we are hacking depage right in between paging
@@ -1306,8 +1275,9 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = consumer.receive(500000);
+ ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
+ System.out.println("Received " + i);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-11
01:11:10 UTC (rev 9869)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-11-11
02:09:12 UTC (rev 9870)
@@ -1553,6 +1553,15 @@
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo,
int)
+ */
+ public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage)
throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory