Author: clebert.suconic(a)jboss.com
Date: 2012-01-04 09:23:01 -0500 (Wed, 04 Jan 2012)
New Revision: 11962
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7846
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-01-04
14:11:58 UTC (rev 11961)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-01-04
14:23:01 UTC (rev 11962)
@@ -1222,6 +1222,7 @@
try
{
+ boolean expired = false;
while (iter.hasNext())
{
MessageReference ref = iter.next();
@@ -1230,6 +1231,7 @@
if (ref.getMessage().isExpired())
{
deliveringCount.incrementAndGet();
+ expired = true;
expire(ref);
iter.remove();
refRemoved(ref);
@@ -1240,6 +1242,11 @@
log.warn("Error expiring reference " + ref, e);
}
}
+
+ if (expired && pageIterator != null &&
pageIterator.hasNext())
+ {
+ scheduleDepage();
+ }
}
finally
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2012-01-04
14:11:58 UTC (rev 11961)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2012-01-04
14:23:01 UTC (rev 11962)
@@ -23,7 +23,9 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
@@ -38,6 +40,8 @@
public class ExpiryLargeMessageTest extends ServiceTestBase
{
+ private static final Logger log = Logger.getLogger(ExpiryLargeMessageTest.class);
+
// Constants -----------------------------------------------------
final SimpleString EXPIRY = new SimpleString("my-expiry");
@@ -129,25 +133,32 @@
producer.send(message);
}
+
+ session.close();
server.stop();
server.start();
+
+ Queue queueExpiry = server.locateQueue(EXPIRY);
+ Queue myQueue = server.locateQueue(MY_QUEUE);
sf = locator.createSessionFactory();
-
- session = sf.createSession(true, true, 0);
-
+
Thread.sleep(1500);
- // just to try expiring
- ClientConsumer cons = session.createConsumer(MY_QUEUE);
- assertNull(cons.receive(1000));
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() &&
queueExpiry.getMessageCount() != numberOfMessages)
+ {
+ // What the Expiry Scan would be doing
+ myQueue.expireReferences();
+ Thread.sleep(50);
+ }
+
+ assertEquals(50, queueExpiry.getMessageCount());
- session.close();
-
session = sf.createSession(false, false);
- cons = session.createConsumer(EXPIRY);
+ ClientConsumer cons = session.createConsumer(EXPIRY);
session.start();
// Consume half of the messages to make sure all the messages are paging (on the
second try)
@@ -167,7 +178,7 @@
cons = session.createConsumer(EXPIRY);
session.start();
- System.out.println("Trying " + rep);
+ log.info("Trying " + rep);
for (int i = 0; i < numberOfMessages / 2; i++)
{
ClientMessage message = cons.receive(5000);