Author: clebert.suconic(a)jboss.com
Date: 2010-11-11 23:51:39 -0500 (Thu, 11 Nov 2010)
New Revision: 9878
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
moving method moveNext from PageCursor to PageSubscription
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-11
22:23:30 UTC (rev 9877)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-12
04:51:39 UTC (rev 9878)
@@ -54,8 +54,6 @@
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- PagedReference getNext(PageSubscription cursor, PagePosition pos) throws Exception;
-
PagedMessage getMessage(PagePosition pos) throws Exception;
void processReload() throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-11
22:23:30 UTC (rev 9877)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-12
04:51:39 UTC (rev 9878)
@@ -22,7 +22,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -33,7 +32,6 @@
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
@@ -59,8 +57,6 @@
private final PagingStore pagingStore;
- private final PagingManager pagingManager;
-
private final StorageManager storageManager;
private final ExecutorFactory executorFactory;
@@ -80,7 +76,6 @@
final ExecutorFactory executorFactory)
{
this.pagingStore = pagingStore;
- this.pagingManager = pagingStore.getPagingManager();
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
@@ -120,104 +115,6 @@
return activeCursors.get(cursorID);
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
- */
- public PagedReference getNext(final PageSubscription cursor, PagePosition cursorPos)
throws Exception
- {
-
- while (true)
- {
- PagedReference retPos = internalGetNext(cursorPos, cursor);
-
- if (retPos == null)
- {
- return null;
- }
- else if (retPos != null)
- {
- cursorPos = retPos.getPosition();
-
- if (!routed(retPos.getPagedMessage(), cursor))
- {
- cursor.positionIgnored(cursorPos);
- }
- else
- if (retPos.getPagedMessage().getTransactionID() != 0)
- {
- PageTransactionInfo tx =
pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
- if (tx == null)
- {
- log.warn("Couldn't locate page transaction " +
retPos.getPagedMessage().getTransactionID() +
- ", ignoring message on position " +
- retPos.getPosition());
- cursor.positionIgnored(cursorPos);
- }
- else
- {
- if (!tx.deliverAfterCommit(cursor, cursorPos))
- {
- return retPos;
- }
- }
- }
- else
- {
- return retPos;
- }
- }
- }
- }
-
- private boolean routed(PagedMessage message, PageSubscription subs)
- {
- long id = subs.getId();
-
- for (long qid : message.getQueueIDs())
- {
- if (qid == id)
- {
- return true;
- }
- }
- return false;
- }
-
- private PagedReference internalGetNext(final PagePosition pos, final PageSubscription
sub)
- {
- PagePosition retPos = pos.nextMessage();
-
- PageCache cache = getPageCache(pos);
-
- if (!cache.isLive() && retPos.getMessageNr() >=
cache.getNumberOfMessages())
- {
- retPos = pos.nextPage();
-
- cache = getPageCache(retPos);
-
- if (cache == null)
- {
- return null;
- }
-
- if (retPos.getMessageNr() >= cache.getNumberOfMessages())
- {
- return null;
- }
- }
-
- PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
-
- if (serverMessage != null)
- {
- return newReference(retPos, serverMessage, sub);
- }
- else
- {
- return null;
- }
- }
-
public PagedMessage getMessage(final PagePosition pos) throws Exception
{
PageCache cache = getPageCache(pos);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-11
22:23:30 UTC (rev 9877)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-12
04:51:39 UTC (rev 9878)
@@ -79,9 +79,9 @@
private final StorageManager store;
private final long cursorId;
-
+
private Queue queue;
-
+
private final boolean persistent;
private final Filter filter;
@@ -106,12 +106,12 @@
// Constructors --------------------------------------------------
public PageSubscriptionImpl(final PageCursorProvider cursorProvider,
- final PagingStore pageStore,
- final StorageManager store,
- final Executor executor,
- final Filter filter,
- final long cursorId,
- final boolean persistent)
+ final PagingStore pageStore,
+ final StorageManager store,
+ final Executor executor,
+ final Filter filter,
+ final long cursorId,
+ final boolean persistent)
{
this.pageStore = pageStore;
this.store = store;
@@ -128,17 +128,17 @@
{
return queue;
}
-
+
public boolean isPaging()
{
return pageStore.isPaging();
}
-
+
public void setQueue(Queue queue)
{
this.queue = queue;
}
-
+
public void disableAutoCleanup()
{
autoCleanup = false;
@@ -209,7 +209,8 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() + " now since it's the current page");
+ PageSubscriptionImpl.trace("We can't clear page " +
entry.getKey() +
+ " now since it's the current
page");
}
else
{
@@ -261,8 +262,8 @@
if (consumedPages.remove(completePage.getPageId()) == null)
{
PageSubscriptionImpl.log.warn("Couldn't remove page
" + completePage.getPageId() +
- " from consumed pages on cursor
for address " +
- pageStore.getAddress());
+ " from consumed pages on
cursor for address " +
+ pageStore.getAddress());
}
}
}
@@ -303,30 +304,64 @@
do
{
- message = cursorProvider.getNext(this, tmpPosition);
+ message = internalGetNext(tmpPosition);
- boolean valid = true;
-
+
if (message == null)
{
- valid = false;
+ break;
}
- else
+
+ tmpPosition = message.getPosition();
+
+ boolean valid = true;
+ boolean ignored = false;
+
+ // Validate the scenarios where the message should be considered not valid even
to be considered
+
+ // 1st... is it routed?
+
+ valid = routed(message.getPagedMessage());
+ if (!valid) ignored = true;
+
+ // 2nd ... if TX, is it committed?
+ if (valid && message.getPagedMessage().getTransactionID() != 0)
{
+ PageTransactionInfo tx =
pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+
.getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " +
message.getPagedMessage().getTransactionID() +
+ ", ignoring message on position " +
+ message.getPosition());
+ valid = false;
+ ignored = true;
+ }
+ else
+ {
+ if (tx.deliverAfterCommit(this, message.getPosition()))
+ {
+ valid = false;
+ ignored = false;
+ }
+ }
+ }
+
+ // 3rd... was it previously removed?
+ if (valid)
+ {
// We don't create a PageCursorInfo unless we are doing a write operation
(ack or removing)
- // Say you have a Browser that will only read the files... there's no
need to control PageCursors is nothing
+ // Say you have a Browser that will only read the files... there's no
need to control PageCursors is nothing
// is being changed. That's why the false is passed as a parameter here
PageCursorInfo info = getPageInfo(message.getPosition(), false);
if (info != null && info.isRemoved(message.getPosition()))
{
- tmpPosition = message.getPosition();
valid = false;
}
}
+
if (valid)
{
- tmpPosition = message.getPosition();
-
match = match(message.getMessage());
if (!match)
@@ -334,12 +369,70 @@
processACK(message.getPosition());
}
}
+ else if (ignored)
+ {
+ positionIgnored(message.getPosition());
+ }
}
while (message != null && !match);
return message;
}
+ private PagedReference internalGetNext(final PagePosition pos)
+ {
+ PagePosition retPos = pos.nextMessage();
+
+ PageCache cache = cursorProvider.getPageCache(pos);
+
+ if (cache == null)
+ {
+ return null;
+ }
+
+ if (!cache.isLive() && retPos.getMessageNr() >=
cache.getNumberOfMessages())
+ {
+ retPos = pos.nextPage();
+
+ cache = cursorProvider.getPageCache(retPos);
+
+ if (cache == null)
+ {
+ return null;
+ }
+
+ if (retPos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ return null;
+ }
+ }
+
+ PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+
+ if (serverMessage != null)
+ {
+ return cursorProvider.newReference(retPos, serverMessage, this);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private boolean routed(PagedMessage message)
+ {
+ long id = getId();
+
+ for (long qid : message.getQueueIDs())
+ {
+ if (qid == id)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
*
*/
@@ -360,7 +453,7 @@
// The list is not ordered...
// This is only done at creation of the queue, so we just scan instead of
keeping the list ordened
PagePosition retValue = null;
-
+
for (PagePosition pos : entry.getValue().acks)
{
System.out.println("Analizing " + pos);
@@ -369,9 +462,9 @@
retValue = pos;
}
}
-
+
System.out.println("Returning initial position " + retValue);
-
+
return retValue;
}
}
@@ -391,11 +484,10 @@
}
-
public void ackTx(final Transaction tx, final PagedReference reference) throws
Exception
{
ackTx(tx, reference.getPosition());
-
+
PageTransactionInfo txInfo = getPageTransaction(reference);
if (txInfo != null)
{
@@ -403,7 +495,6 @@
}
}
-
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -416,7 +507,7 @@
txInfo.storeUpdate(this.store, pageStore.getPagingManager());
}
}
-
+
public void ack(final PagePosition position) throws Exception
{
// if we are dealing with a persistent cursor
@@ -465,7 +556,6 @@
}
}
-
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageSubscription#queryMessage(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -481,7 +571,6 @@
}
}
-
/**
* Theres no need to synchronize this method as it's only called from journal load
on startup
*/
@@ -602,7 +691,7 @@
{
return cursorId;
}
-
+
public boolean isPersistent()
{
return persistent;
@@ -619,7 +708,7 @@
Collections.sort(recoveredACK);
boolean first = true;
-
+
for (PagePosition pos : recoveredACK)
{
lastAckedPosition = pos;
@@ -669,7 +758,7 @@
System.out.println(info);
}
}
-
+
private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
{
return getPageInfo(pos, true);
@@ -756,7 +845,7 @@
cursorTX.addPositionConfirmation(this, position);
}
-
+
private PageTransactionInfo getPageTransaction(final PagedReference reference)
{
if (reference.getPagedMessage().getTransactionID() != 0)
@@ -780,6 +869,7 @@
scheduleCleanupCheck();
}
}
+
// Inner classes -------------------------------------------------
/**
@@ -798,7 +888,7 @@
private final List<PagePosition> acks = Collections.synchronizedList(new
LinkedList<PagePosition>());
private WeakReference<PageCache> cache;
-
+
private Set<PagePosition> removedReferences = new
ConcurrentHashSet<PagePosition>();
// The page was live at the time of the creation
@@ -856,12 +946,12 @@
{
return pageId;
}
-
+
public boolean isRemoved(final PagePosition pos)
{
return removedReferences.contains(pos);
}
-
+
public void remove(final PagePosition position)
{
removedReferences.add(position);
@@ -875,10 +965,10 @@
if (isTrace)
{
PageSubscriptionImpl.trace("numberOfMessages = " +
getNumberOfMessages() +
- " confirmed = " +
- (confirmed.get() + 1) +
- ", page = " +
- pageId);
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ ", page = " +
+ pageId);
}
// Negative could mean a bookmark on the first element for the page (example
-1)
@@ -952,7 +1042,6 @@
}
}
}
-
class CursorIterator implements LinkedListIterator<PagedReference>
{
@@ -963,11 +1052,11 @@
private final LinkedListIterator<PagePosition> redeliveryIterator;
private volatile boolean isredelivery = false;
-
+
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
private volatile PagedReference cachedNext;
-
+
public CursorIterator()
{
synchronized (redeliveries)
@@ -975,7 +1064,6 @@
redeliveryIterator = redeliveries.iterator();
}
}
-
public void repeat()
{
@@ -1004,14 +1092,14 @@
*/
public synchronized PagedReference next()
{
-
+
if (cachedNext != null)
{
PagedReference retPos = cachedNext;
cachedNext = null;
return retPos;
}
-
+
try
{
synchronized (redeliveries)
@@ -1027,7 +1115,7 @@
isredelivery = false;
}
}
-
+
if (position == null)
{
position = getStartPosition();
@@ -1056,12 +1144,12 @@
{
return true;
}
-
+
if (!pageStore.isPaging())
{
return false;
}
-
+
cachedNext = next();
return cachedNext != null;
@@ -1083,5 +1171,4 @@
}
}
-
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-11
22:23:30 UTC (rev 9877)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-12
04:51:39 UTC (rev 9878)
@@ -25,11 +25,10 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.DataConstants;
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-11
22:23:30 UTC (rev 9877)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-12
04:51:39 UTC (rev 9878)
@@ -26,7 +26,6 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -36,7 +35,6 @@
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -55,7 +53,7 @@
import org.hornetq.utils.LinkedListIterator;
/**
- * A PageCacheTest
+ * A PageCursorTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
*
@@ -726,10 +724,18 @@
}
assertNull(iterator.next());
+
+ server.getStorageManager().waitOnOperations();
server.stop();
createServer();
- waitCleanup();
+
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (System.currentTimeMillis() < timeout &&
lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+ {
+ Thread.sleep(500);
+ }
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}