[hornetq-commits] JBoss hornetq SVN: r10070 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Dec 22 18:11:41 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-22 18:11:40 -0500 (Wed, 22 Dec 2010)
New Revision: 10070
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Paging changes
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -36,6 +36,8 @@
void bookmark(PagePosition position) throws Exception;
PageSubscriptionCounter getCounter();
+
+ long getMessageCount();
long getId();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -28,6 +28,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
@@ -101,6 +102,8 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
+
+ private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -171,6 +174,13 @@
confirmPosition(position);
}
+
+
+ public long getMessageCount()
+ {
+ return counter.getValue() - deliveredCount.get();
+ }
+
public PageSubscriptionCounter getCounter()
{
return counter;
@@ -959,6 +969,7 @@
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
+ cursor.deliveredCount.decrementAndGet();
}
}
@@ -1195,6 +1206,7 @@
{
if (!isredelivery)
{
+ deliveredCount.incrementAndGet();
PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -661,7 +661,7 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getCounter().getValue();
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
}
else
{
@@ -1639,6 +1639,11 @@
}
queue.deliveringCount.decrementAndGet();
+
+ if (queue.deliveringCount.get() < 0)
+ {
+ new Exception("DeliveringCount became negative").printStackTrace();
+ }
try
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -236,6 +236,8 @@
assertNull(consumer.receiveImmediate());
sessionCheck.close();
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
locator.close();
@@ -277,6 +279,7 @@
locator.close();
+ queue.getMessageCount();
//assertEquals(numberOfMessages, queue.getMessageCount());
}
finally
More information about the hornetq-commits
mailing list