[hornetq-commits] JBoss hornetq SVN: r10675 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue May 17 00:37:19 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-17 00:37:18 -0400 (Tue, 17 May 2011)
New Revision: 10675
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Paging optimizations and test fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-17 03:39:56 UTC (rev 10674)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
@@ -121,6 +121,8 @@
private final Runnable deliverRunner = new DeliverRunner();
+ private volatile boolean depagePending = false;
+
private final Runnable depageRunner = new DepageRunner();
private final StorageManager storageManager;
@@ -837,10 +839,18 @@
{
if (expiryAddress != null)
{
+ if (isTrace)
+ {
+ log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+ }
move(expiryAddress, ref, true, false);
}
else
{
+ if (isTrace)
+ {
+ log.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
+ }
acknowledge(ref);
}
}
@@ -1605,12 +1615,20 @@
private void scheduleDepage()
{
- pageSubscription.getExecutor().execute(depageRunner);
+ if (!depagePending)
+ {
+ if (isTrace)
+ {
+ log.trace("Scheduling depage for queue " + this.getName());
+ }
+ depagePending = true;
+ pageSubscription.getExecutor().execute(depageRunner);
+ }
}
private void depage()
{
- if (paused || pageIterator == null || consumerList.isEmpty())
+ if (paused || pageIterator == null)
{
return;
}
@@ -1618,13 +1636,15 @@
long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
// System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+ int depaged = 0;
while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
+ depaged++;
PagedReference reference = pageIterator.next();
addTail(reference, false);
pageIterator.remove();
}
- // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+ log.debug("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
deliverAsync();
}
@@ -1898,6 +1918,10 @@
{
if (reference.getMessage().isExpired())
{
+ if (isTrace)
+ {
+ log.trace("Reference " + reference + " is expired");
+ }
reference.handled();
try
@@ -2191,6 +2215,7 @@
{
try
{
+ depagePending = false;
depage();
}
catch (Exception e)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-17 03:39:56 UTC (rev 10674)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-17 04:37:18 UTC (rev 10675)
@@ -3844,13 +3844,12 @@
session.start();
- ClientConsumer cons = session.createConsumer(ADDRESS);
+ ClientConsumer consAddr = session.createConsumer(ADDRESS);
- assertNull(cons.receive(1000));
+ assertNull(consAddr.receive(1000));
- cons.close();
- cons = session.createConsumer("DLA");
+ ClientConsumer cons = session.createConsumer("DLA");
for (int i = 0; i < 500; i++)
{
More information about the hornetq-commits
mailing list