Author: clebert.suconic(a)jboss.com
Date: 2011-05-17 00:37:18 -0400 (Tue, 17 May 2011)
New Revision: 10675
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Paging optimizations and test fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-17
03:39:56 UTC (rev 10674)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-17
04:37:18 UTC (rev 10675)
@@ -121,6 +121,8 @@
private final Runnable deliverRunner = new DeliverRunner();
+ private volatile boolean depagePending = false;
+
private final Runnable depageRunner = new DepageRunner();
private final StorageManager storageManager;
@@ -837,10 +839,18 @@
{
if (expiryAddress != null)
{
+ if (isTrace)
+ {
+ log.trace("moving expired reference " + ref + " to address =
" + expiryAddress + " from queue=" + this.getName(), new Exception
("trace"));
+ }
move(expiryAddress, ref, true, false);
}
else
{
+ if (isTrace)
+ {
+ log.trace("expiry is null, just acking expired message for reference
" + ref + " from queue=" + this.getName());
+ }
acknowledge(ref);
}
}
@@ -1605,12 +1615,20 @@
private void scheduleDepage()
{
- pageSubscription.getExecutor().execute(depageRunner);
+ if (!depagePending)
+ {
+ if (isTrace)
+ {
+ log.trace("Scheduling depage for queue " + this.getName());
+ }
+ depagePending = true;
+ pageSubscription.getExecutor().execute(depageRunner);
+ }
}
private void depage()
{
- if (paused || pageIterator == null || consumerList.isEmpty())
+ if (paused || pageIterator == null)
{
return;
}
@@ -1618,13 +1636,15 @@
long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
// System.out.println("QueueMemorySize before depage = " +
queueMemorySize.get());
+ int depaged = 0;
while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
+ depaged++;
PagedReference reference = pageIterator.next();
addTail(reference, false);
pageIterator.remove();
}
- // System.out.println("QueueMemorySize after depage = " +
queueMemorySize.get() + " depaged " + nmessages);
+ log.debug("Queue Memory Size after depage on queue="+this.getName() +
" is " + queueMemorySize.get() + " with maxSize = " + maxSize +
". Depaged " + depaged + " messages");
deliverAsync();
}
@@ -1898,6 +1918,10 @@
{
if (reference.getMessage().isExpired())
{
+ if (isTrace)
+ {
+ log.trace("Reference " + reference + " is expired");
+ }
reference.handled();
try
@@ -2191,6 +2215,7 @@
{
try
{
+ depagePending = false;
depage();
}
catch (Exception e)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-17
03:39:56 UTC (rev 10674)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-17
04:37:18 UTC (rev 10675)
@@ -3844,13 +3844,12 @@
session.start();
- ClientConsumer cons = session.createConsumer(ADDRESS);
+ ClientConsumer consAddr = session.createConsumer(ADDRESS);
- assertNull(cons.receive(1000));
+ assertNull(consAddr.receive(1000));
- cons.close();
- cons = session.createConsumer("DLA");
+ ClientConsumer cons = session.createConsumer("DLA");
for (int i = 0; i < 500; i++)
{
Show replies by date