Author: clebert.suconic(a)jboss.com
Date: 2010-11-13 00:38:12 -0500 (Sat, 13 Nov 2010)
New Revision: 9885
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing sort order
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-13
02:43:47 UTC (rev 9884)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-13
05:38:12 UTC (rev 9885)
@@ -23,7 +23,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-13
02:43:47 UTC (rev 9884)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-13
05:38:12 UTC (rev 9885)
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +49,6 @@
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.Future;
-import org.hornetq.utils.LinkedListImpl;
import org.hornetq.utils.LinkedListIterator;
/**
@@ -99,7 +99,7 @@
private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
// We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
- private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new
LinkedListImpl<PagePosition>();
+ private final ConcurrentLinkedQueue<PagePosition> redeliveries = new
ConcurrentLinkedQueue<PagePosition>();
// Static --------------------------------------------------------
@@ -464,7 +464,7 @@
{
synchronized (redeliveries)
{
- redeliveries.addTail(position);
+ redeliveries.add(position);
}
}
@@ -961,20 +961,16 @@
private PagePosition lastOperation = null;
- private final LinkedListIterator<PagePosition> redeliveryIterator;
-
private volatile boolean isredelivery = false;
+ private volatile PagedReference lastRedelivery = null;
+
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
private volatile PagedReference cachedNext;
public CursorIterator()
{
- synchronized (redeliveries)
- {
- redeliveryIterator = redeliveries.iterator();
- }
}
public void repeat()
@@ -983,7 +979,7 @@
{
synchronized (redeliveries)
{
- redeliveryIterator.repeat();
+ cachedNext = lastRedelivery;
}
}
else
@@ -1045,16 +1041,20 @@
{
synchronized (redeliveries)
{
- if (redeliveryIterator.hasNext())
+ PagePosition redelivery = redeliveries.poll();
+
+ if (redelivery != null)
{
// There's a redelivery pending, we will get it out of that pool
instead
isredelivery = true;
- PagedReference redeliveredMsg =
getReference(redeliveryIterator.next());
+ PagedReference redeliveredMsg = getReference(redelivery);
+ lastRedelivery = redeliveredMsg;
return redeliveredMsg;
}
else
{
+ lastRedelivery = null;
isredelivery = false;
}
@@ -1077,7 +1077,9 @@
valid = routed(message.getPagedMessage());
if (!valid)
+ {
ignored = true;
+ }
// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() != 0)
@@ -1171,7 +1173,10 @@
*/
public void remove()
{
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ if (!isredelivery)
+ {
+ PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ }
}
/* (non-Javadoc)
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-13
02:43:47 UTC (rev 9884)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-13
05:38:12 UTC (rev 9885)
@@ -14,9 +14,7 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@@ -43,7 +41,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
@@ -53,7 +50,6 @@
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
@@ -1049,19 +1045,14 @@
{
numberOfPages++;
- currentPageId++;
+ int tmpCurrentPageId = currentPageId + 1;
- if (currentPageId < firstPageId)
- {
- firstPageId = currentPageId;
- }
-
if (currentPage != null)
{
currentPage.close();
}
- currentPage = createPage(currentPageId);
+ currentPage = createPage(tmpCurrentPageId);
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
@@ -1072,6 +1063,13 @@
currentPageSize.set(0);
currentPage.open();
+
+ currentPageId = tmpCurrentPageId;
+
+ if (currentPageId < firstPageId)
+ {
+ firstPageId = currentPageId;
+ }
}
finally
{
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-13
02:43:47 UTC (rev 9884)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-13
05:38:12 UTC (rev 9885)
@@ -966,7 +966,7 @@
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++)
- {
+ {
ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
assertEquals(i, msg.getIntProperty("count").intValue());