Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 21:46:06 -0500 (Mon, 06 Feb 2012)
New Revision: 12087
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
HORNETQ-843 - fixing expiry on Paging
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-06
14:29:24 UTC (rev 12086)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-07
02:46:06 UTC (rev 12087)
@@ -1223,8 +1223,10 @@
try
{
boolean expired = false;
+ boolean hasElements = false;
while (iter.hasNext())
{
+ hasElements = true;
MessageReference ref = iter.next();
try
{
@@ -1243,7 +1245,8 @@
}
}
- if (expired && pageIterator != null &&
pageIterator.hasNext())
+ // If empty we need to schedule depaging to make sure we would depage
expired messages as well
+ if ((!hasElements || expired && pageIterator != null)
&& pageIterator.hasNext())
{
scheduleDepage();
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-06
14:29:24 UTC (rev 12086)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07
02:46:06 UTC (rev 12087)
@@ -138,6 +138,7 @@
backupConfig.setSharedStore(true);
backupConfig.setBackup(true);
backupConfig.setClustered(true);
+ backupConfig.setMessageExpiryScanPeriod(100);
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector =
getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
@@ -156,6 +157,7 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
+ liveConfig.setMessageExpiryScanPeriod(100);
List<String> pairs = null;
ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
pairs, false);
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-06
14:29:24 UTC (rev 12086)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07
02:46:06 UTC (rev 12087)
@@ -31,6 +31,7 @@
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -202,6 +203,68 @@
}
}
+ public void testExpireMessage() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+
+ session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS,
true);
+
+ ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
+
+ final int TOTAL_MESSAGES = 1000;
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty(new SimpleString("key"), i);
+ msg.setExpiration(System.currentTimeMillis() + 1000);
+ prod.send(msg);
+ }
+
+ crash(session);
+
+ session.close();
+
+ Queue queue = backupServer.getServer().locateQueue(ADDRESS);
+
+ long timeout = System.currentTimeMillis() + 60000;
+ System.out.println("Starting now");
+ while (timeout > System.currentTimeMillis() &&
queue.getPageSubscription().isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ try
+ {
+ assertFalse(queue.getPageSubscription().isPaging());
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+
/**
* @param session
* @param latch
Show replies by date